You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/03/04 06:08:41 UTC
[hbase] branch branch-2 updated: HBASE-23146 Support CheckAndMutate
with multiple conditions (#1209)
This is an automated email from the ASF dual-hosted git repository.
brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new c3edceb HBASE-23146 Support CheckAndMutate with multiple conditions (#1209)
c3edceb is described below
commit c3edceb6aef1c5a0bae6eeafeb74900895963a88
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Wed Mar 4 15:08:31 2020 +0900
HBASE-23146 Support CheckAndMutate with multiple conditions (#1209)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../org/apache/hadoop/hbase/client/AsyncTable.java | 55 ++++++
.../apache/hadoop/hbase/client/AsyncTableImpl.java | 31 ++++
.../org/apache/hadoop/hbase/client/HTable.java | 123 ++++++++++---
.../hadoop/hbase/client/RawAsyncTableImpl.java | 87 +++++++--
.../java/org/apache/hadoop/hbase/client/Table.java | 48 +++++
.../hbase/shaded/protobuf/RequestConverter.java | 89 ++++-----
.../src/main/protobuf/Client.proto | 9 +-
hbase-protocol/src/main/protobuf/Client.proto | 9 +-
.../hadoop/hbase/rest/client/RemoteHTable.java | 34 ++--
.../hadoop/hbase/coprocessor/RegionObserver.java | 137 +++++++++++++-
.../apache/hadoop/hbase/regionserver/HRegion.java | 79 ++++++--
.../hadoop/hbase/regionserver/RSRpcServices.java | 111 +++++++----
.../apache/hadoop/hbase/regionserver/Region.java | 53 ++++++
.../hbase/regionserver/RegionCoprocessorHost.java | 151 ++++++++++++++-
.../apache/hadoop/hbase/client/TestAsyncTable.java | 203 +++++++++++++++++++++
.../hadoop/hbase/client/TestCheckAndMutate.java | 190 +++++++++++++++++++
.../hbase/client/TestMalformedCellFromClient.java | 5 +-
.../hbase/coprocessor/SimpleRegionObserver.java | 97 ++++++++--
.../coprocessor/TestRegionObserverInterface.java | 51 +++++-
.../hadoop/hbase/regionserver/RegionAsTable.java | 6 +
.../hadoop/hbase/regionserver/TestHRegion.java | 123 +++++++++++++
.../hadoop/hbase/thrift2/client/ThriftTable.java | 6 +
22 files changed, 1493 insertions(+), 204 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index bfcc187..e10f1f82 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@@ -290,6 +291,60 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
}
/**
+ * Atomically checks if a row matches the specified filter. If it does, it adds the
+ * Put/Delete/RowMutations.
+ * <p>
+ * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
+ * execute it. This is a fluent style API, the code is like:
+ *
+ * <pre>
+ * <code>
+ * table.checkAndMutate(row, filter).thenPut(put)
+ * .thenAccept(succ -> {
+ * if (succ) {
+ * System.out.println("Check and put succeeded");
+ * } else {
+ * System.out.println("Check and put failed");
+ * }
+ * });
+ * </code>
+ * </pre>
+ */
+ CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
+
+ /**
+ * A helper class for sending checkAndMutate request with a filter.
+ */
+ interface CheckAndMutateWithFilterBuilder {
+
+ /**
+ * @param timeRange time range to check.
+ */
+ CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
+
+ /**
+ * @param put data to put if check succeeds
+ * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
+ * will be wrapped by a {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> thenPut(Put put);
+
+ /**
+ * @param delete data to delete if check succeeds
+ * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
+ * value will be wrapped by a {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> thenDelete(Delete delete);
+
+ /**
+ * @param mutation mutations to perform if check succeeds
+ * @return true if the new mutation was executed, false otherwise. The return value will be
+ * wrapped by a {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> thenMutate(RowMutations mutation);
+ }
+
+ /**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
* @param mutation object that specifies the set of mutations to perform atomically
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 2256a4c..d6406b6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -174,6 +175,36 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
@Override
+ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+ return new CheckAndMutateWithFilterBuilder() {
+
+ private final CheckAndMutateWithFilterBuilder builder =
+ rawTable.checkAndMutate(row, filter);
+
+ @Override
+ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
+ builder.timeRange(timeRange);
+ return this;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> thenPut(Put put) {
+ return wrap(builder.thenPut(put));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> thenDelete(Delete delete) {
+ return wrap(builder.thenDelete(delete));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
+ return wrap(builder.thenMutate(mutation));
+ }
+ };
+ }
+
+ @Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 54ab606..9b30de6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -39,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -676,14 +675,16 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
+ return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
+ put);
}
@Override
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
+ return doCheckAndPut(row, family, qualifier, toCompareOperator(compareOp), value, null,
+ null, put);
}
@Override
@@ -692,21 +693,20 @@ public class HTable implements Table {
final CompareOperator op, final byte [] value, final Put put) throws IOException {
// The name of the operators in CompareOperator are intentionally those of the
// operators in the filter's CompareOp enum.
- return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
+ return doCheckAndPut(row, family, qualifier, op, value, null, null, put);
}
private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
- final String opName, final byte[] value, final TimeRange timeRange, final Put put)
- throws IOException {
+ final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
+ final Put put) throws IOException {
ClientServiceCallable<Boolean> callable =
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected Boolean rpcCall() throws Exception {
- CompareType compareType = CompareType.valueOf(opName);
MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, timeRange, put);
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
+ filter, timeRange, put);
MutateResponse response = doMutate(request);
return Boolean.valueOf(response.getProcessed());
}
@@ -719,37 +719,37 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
- delete);
+ return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, null,
+ null, delete);
}
@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
+ return doCheckAndDelete(row, family, qualifier, toCompareOperator(compareOp), value, null,
+ null, delete);
}
@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
+ return doCheckAndDelete(row, family, qualifier, op, value, null, null, delete);
}
private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
- final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
- throws IOException {
+ final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
+ final Delete delete) throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
- CompareType compareType = CompareType.valueOf(opName);
MutateRequest request = RequestConverter
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
- qualifier, new BinaryComparator(value), compareType, timeRange, delete);
+ qualifier, op, value, filter, timeRange, delete);
MutateResponse response = doMutate(request);
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
@@ -776,8 +776,14 @@ public class HTable implements Table {
return new CheckAndMutateBuilderImpl(row, family);
}
+ @Override
+ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+ return new CheckAndMutateWithFilterBuilderImpl(row, filter);
+ }
+
private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
- final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
+ final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
+ final RowMutations rm)
throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
@@ -785,10 +791,9 @@ public class HTable implements Table {
rm.getMaxPriority()) {
@Override
protected MultiResponse rpcCall() throws Exception {
- CompareType compareType = CompareType.valueOf(opName);
MultiRequest request = RequestConverter
- .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, timeRange, rm);
+ .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
+ qualifier, op, value, filter, timeRange, rm);
ClientProtos.MultiResponse response = doMulti(request);
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
if (res.hasException()) {
@@ -833,14 +838,43 @@ public class HTable implements Table {
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm)
throws IOException {
- return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
+ return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
+ null, rm);
}
@Override
@Deprecated
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
- return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
+ return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm);
+ }
+
+ private CompareOperator toCompareOperator(CompareOp compareOp) {
+ switch (compareOp) {
+ case LESS:
+ return CompareOperator.LESS;
+
+ case LESS_OR_EQUAL:
+ return CompareOperator.LESS_OR_EQUAL;
+
+ case EQUAL:
+ return CompareOperator.EQUAL;
+
+ case NOT_EQUAL:
+ return CompareOperator.NOT_EQUAL;
+
+ case GREATER_OR_EQUAL:
+ return CompareOperator.GREATER_OR_EQUAL;
+
+ case GREATER:
+ return CompareOperator.GREATER;
+
+ case NO_OP:
+ return CompareOperator.NO_OP;
+
+ default:
+ throw new AssertionError();
+ }
}
@Override
@@ -1247,19 +1281,54 @@ public class HTable implements Table {
public boolean thenPut(Put put) throws IOException {
validatePut(put);
preCheck();
- return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
+ return doCheckAndPut(row, family, qualifier, op, value, null, timeRange, put);
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
preCheck();
- return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
+ return doCheckAndDelete(row, family, qualifier, op, value, null, timeRange, delete);
}
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
preCheck();
- return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
+ return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange,
+ mutation);
+ }
+ }
+
+ private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
+
+ private final byte[] row;
+ private final Filter filter;
+ private TimeRange timeRange;
+
+ CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
+ this.row = Preconditions.checkNotNull(row, "row is null");
+ this.filter = Preconditions.checkNotNull(filter, "filter is null");
+ }
+
+ @Override
+ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
+ this.timeRange = timeRange;
+ return this;
+ }
+
+ @Override
+ public boolean thenPut(Put put) throws IOException {
+ validatePut(put);
+ return doCheckAndPut(row, null, null, null, null, filter, timeRange, put);
+ }
+
+ @Override
+ public boolean thenDelete(Delete delete) throws IOException {
+ return doCheckAndDelete(row, null, null, null, null, filter, timeRange, delete);
+ }
+
+ @Override
+ public boolean thenMutate(RowMutations mutation) throws IOException {
+ return doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation);
}
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 8050137..b4a6cc4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
@@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRespo
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
/**
* The implementation of RawAsyncTable.
@@ -358,10 +357,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
- (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
+ (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+ null, timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}
@@ -370,10 +369,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
- (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
+ (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+ null, timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}
@@ -381,12 +380,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
- return RawAsyncTableImpl.this
- .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
+ return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
+ rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
- (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
+ (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+ null, timeRange, rm),
resp -> resp.getExists()))
.call();
}
@@ -397,6 +396,68 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return new CheckAndMutateBuilderImpl(row, family);
}
+
+ private final class CheckAndMutateWithFilterBuilderImpl
+ implements CheckAndMutateWithFilterBuilder {
+
+ private final byte[] row;
+
+ private final Filter filter;
+
+ private TimeRange timeRange;
+
+ public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
+ this.row = Preconditions.checkNotNull(row, "row is null");
+ this.filter = Preconditions.checkNotNull(filter, "filter is null");
+ }
+
+ @Override
+ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
+ this.timeRange = timeRange;
+ return this;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> thenPut(Put put) {
+ validatePut(put, conn.connConf.getMaxKeyValueSize());
+ return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
+ stub, put,
+ (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
+ filter, timeRange, p),
+ (c, r) -> r.getProcessed()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> thenDelete(Delete delete) {
+ return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
+ loc, stub, delete,
+ (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
+ filter, timeRange, d),
+ (c, r) -> r.getProcessed()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
+ return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
+ rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
+ loc, stub, mutation,
+ (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
+ filter, timeRange, rm),
+ resp -> resp.getExists()))
+ .call();
+ }
+ }
+
+ @Override
+ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+ return new CheckAndMutateWithFilterBuilderImpl(row, filter);
+ }
+
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index a6c10ea..cc2d414 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
@@ -541,6 +542,53 @@ public interface Table extends Closeable {
* @return {@code true} if the new delete was executed, {@code false} otherwise.
*/
boolean thenDelete(Delete delete) throws IOException;
+
+ /**
+ * @param mutation mutations to perform if check succeeds
+ * @return true if the new mutation was executed, false otherwise.
+ */
+ boolean thenMutate(RowMutations mutation) throws IOException;
+ }
+
+ /**
+ * Atomically checks if a row matches the specified filter. If it does, it adds the
+ * Put/Delete/RowMutations.
+ * <p>
+ * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
+ * execute it. This is a fluent style API, the code is like:
+ *
+ * <pre>
+ * <code>
+ * table.checkAndMutate(row, filter).thenPut(put);
+ * </code>
+ * </pre>
+ */
+ default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+ throw new NotImplementedException("Add an implementation!");
+ }
+
+ /**
+ * A helper class for sending checkAndMutate request with a filter.
+ */
+ interface CheckAndMutateWithFilterBuilder {
+
+ /**
+ * @param timeRange timeRange to check
+ */
+ CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
+
+ /**
+ * @param put data to put if check succeeds
+ * @return {@code true} if the new put was executed, {@code false} otherwise.
+ */
+ boolean thenPut(Put put) throws IOException;
+
+ /**
+ * @param delete data to delete if check succeeds
+ * @return {@code true} if the new delete was executed, {@code false} otherwise.
+ */
+ boolean thenDelete(Delete delete) throws IOException;
+
/**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index afdd653..f293bcf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -56,7 +57,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -236,71 +238,51 @@ public final class RequestConverter {
/**
* Create a protocol buffer MutateRequest for a conditioned put
*
- * @param regionName
- * @param row
- * @param family
- * @param qualifier
- * @param comparator
- * @param compareType
- * @param put
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(
- final byte[] regionName, final byte[] row, final byte[] family,
- final byte [] qualifier, final ByteArrayComparable comparator,
- final CompareType compareType, TimeRange timeRange, final Put put) throws IOException {
- return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
- , put, MutationType.PUT);
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+ final TimeRange timeRange, final Put put) throws IOException {
+ return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
+ put, MutationType.PUT);
}
/**
* Create a protocol buffer MutateRequest for a conditioned delete
*
- * @param regionName
- * @param row
- * @param family
- * @param qualifier
- * @param comparator
- * @param compareType
- * @param delete
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(
- final byte[] regionName, final byte[] row, final byte[] family,
- final byte [] qualifier, final ByteArrayComparable comparator,
- final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException {
- return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
- , delete, MutationType.DELETE);
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+ final TimeRange timeRange, final Delete delete) throws IOException {
+ return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
+ delete, MutationType.DELETE);
}
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
- final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator,
- final CompareType compareType, TimeRange timeRange, final Mutation mutation,
+ final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
+ final Filter filter, final TimeRange timeRange, final Mutation mutation,
final MutationType type) throws IOException {
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(type, mutation))
- .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
+ .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
+
/**
* Create a protocol buffer MutateRequest for conditioned row mutations
*
- * @param regionName
- * @param row
- * @param family
- * @param qualifier
- * @param comparator
- * @param compareType
- * @param rowMutations
* @return a mutate request
* @throws IOException
*/
public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
final byte[] row, final byte[] family, final byte[] qualifier,
- final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange,
+ final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
final RowMutations rowMutations) throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
@@ -308,7 +290,7 @@ public final class RequestConverter {
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
- MutationType mutateType = null;
+ MutationType mutateType;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
@@ -324,7 +306,7 @@ public final class RequestConverter {
builder.addAction(actionBuilder.build());
}
return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
- .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
+ .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
@@ -1080,25 +1062,26 @@ public final class RequestConverter {
/**
* Create a protocol buffer Condition
*
- * @param row
- * @param family
- * @param qualifier
- * @param comparator
- * @param compareType
* @return a Condition
* @throws IOException
*/
public static Condition buildCondition(final byte[] row, final byte[] family,
- final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType,
- final TimeRange timeRange) {
- return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row))
- .setFamily(UnsafeByteOperations.unsafeWrap(family))
- .setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ?
- HConstants.EMPTY_BYTE_ARRAY : qualifier))
- .setComparator(ProtobufUtil.toComparator(comparator))
- .setCompareType(compareType)
- .setTimeRange(ProtobufUtil.toTimeRange(timeRange))
- .build();
+ final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+ final TimeRange timeRange) throws IOException {
+
+ Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
+
+ if (filter != null) {
+ builder.setFilter(ProtobufUtil.toFilter(filter));
+ } else {
+ builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
+ .setQualifier(UnsafeByteOperations.unsafeWrap(
+ qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
+ .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
+ .setCompareType(CompareType.valueOf(op.name()));
+ }
+
+ return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
}
/**
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index a22c623..810aaaa 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -139,11 +139,12 @@ message GetResponse {
*/
message Condition {
required bytes row = 1;
- required bytes family = 2;
- required bytes qualifier = 3;
- required CompareType compare_type = 4;
- required Comparator comparator = 5;
+ optional bytes family = 2;
+ optional bytes qualifier = 3;
+ optional CompareType compare_type = 4;
+ optional Comparator comparator = 5;
optional TimeRange time_range = 6;
+ optional Filter filter = 7;
}
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 3681ae9..b985582 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -139,11 +139,12 @@ message GetResponse {
*/
message Condition {
required bytes row = 1;
- required bytes family = 2;
- required bytes qualifier = 3;
- required CompareType compare_type = 4;
- required Comparator comparator = 5;
+ optional bytes family = 2;
+ optional bytes qualifier = 3;
+ optional CompareType compare_type = 4;
+ optional Comparator comparator = 5;
optional TimeRange time_range = 6;
+ optional Filter filter = 7;
}
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index d4feb5e..c77f463 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -24,6 +24,19 @@ import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -33,12 +46,13 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
@@ -65,19 +79,6 @@ import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
@@ -792,6 +793,11 @@ public class RemoteHTable implements Table {
}
@Override
+ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+ throw new NotImplementedException("Implement later");
+ }
+
+ @Override
@Deprecated
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 8761d6b..05071fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -511,9 +512,8 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
- * @param result
- * @return the return value to return to client if bypassing default
- * processing
+ * @param result the default value of the result
+ * @return the return value to return to client if bypassing default processing
*/
default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
@@ -522,6 +522,26 @@ public interface RegionObserver {
}
/**
+ * Called before checkAndPut.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions.
+ * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+ * <p>
+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param filter filter
+ * @param put data to put if check succeeds
+ * @param result the default value of the result
+ * @return the return value to return to client if bypassing default processing
+ */
+ default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ Filter filter, Put put, boolean result) throws IOException {
+ return result;
+ }
+
+ /**
* Called before checkAndPut but after acquiring rowlock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
@@ -540,9 +560,8 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
- * @param result
- * @return the return value to return to client if bypassing default
- * processing
+ * @param result the default value of the result
+ * @return the return value to return to client if bypassing default processing
*/
default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
@@ -551,6 +570,30 @@ public interface RegionObserver {
}
/**
+ * Called before checkAndPut but after acquiring rowlock.
+ * <p>
+ * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+ * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+ * can lead to potential deadlock.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions.
+ * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+ * <p>
+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param filter filter
+ * @param put data to put if check succeeds
+ * @param result the default value of the result
+ * @return the return value to return to client if bypassing default processing
+ */
+ default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+ byte[] row, Filter filter, Put put, boolean result) throws IOException {
+ return result;
+ }
+
+ /**
* Called after checkAndPut
* <p>
* Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
@@ -572,6 +615,23 @@ public interface RegionObserver {
}
/**
+ * Called after checkAndPut
+ * <p>
+ * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param filter filter
+ * @param put data to put if check succeeds
+ * @param result from the checkAndPut
+ * @return the possibly transformed return value to return to client
+ */
+ default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ Filter filter, Put put, boolean result) throws IOException {
+ return result;
+ }
+
+ /**
* Called before checkAndDelete.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
@@ -586,7 +646,7 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
- * @param result
+ * @param result the default value of the result
* @return the value to return to client if bypassing default processing
*/
default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
@@ -596,6 +656,26 @@ public interface RegionObserver {
}
/**
+ * Called before checkAndDelete.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions.
+ * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+ * <p>
+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param filter column family
+ * @param delete delete to commit if check succeeds
+ * @param result the default value of the result
+ * @return the value to return to client if bypassing default processing
+ */
+ default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ Filter filter, Delete delete, boolean result) throws IOException {
+ return result;
+ }
+
+ /**
* Called before checkAndDelete but after acquiring rowock.
* <p>
* <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
@@ -614,7 +694,7 @@ public interface RegionObserver {
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
- * @param result
+ * @param result the default value of the result
* @return the value to return to client if bypassing default processing
*/
default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
@@ -624,6 +704,30 @@ public interface RegionObserver {
}
/**
+ * Called before checkAndDelete but after acquiring rowock.
+ * <p>
+ * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+ * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+ * can lead to potential deadlock.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions.
+ * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+ * <p>
+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param filter filter
+ * @param delete delete to commit if check succeeds
+ * @param result the default value of the result
+ * @return the value to return to client if bypassing default processing
+ */
+ default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+ byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
+ return result;
+ }
+
+ /**
* Called after checkAndDelete
* <p>
* Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
@@ -645,6 +749,23 @@ public interface RegionObserver {
}
/**
+ * Called after checkAndDelete
+ * <p>
+ * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+ * If need a Cell reference for later use, copy the cell and use that.
+ * @param c the environment provided by the region server
+ * @param row row to check
+ * @param filter filter
+ * @param delete delete to commit if check succeeds
+ * @param result from the CheckAndDelete
+ * @return the possibly transformed returned value to return to client
+ */
+ default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ Filter filter, Delete delete, boolean result) throws IOException {
+ return result;
+ }
+
+ /**
* Called before Append.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index dac034d..37adbca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HFileLink;
@@ -4176,13 +4177,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
checkMutationType(mutation, row);
- return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation);
+ return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
+ mutation);
+ }
+
+ @Override
+ public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
+ throws IOException {
+ return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
}
@Override
public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
- return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null);
+ return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
+ }
+
+ @Override
+ public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
+ throws IOException {
+ return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
}
/**
@@ -4190,7 +4204,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* switches in the few places where there is deviation.
*/
private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange,
+ CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
RowMutations rowMutations, Mutation mutation)
throws IOException {
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
@@ -4204,8 +4218,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation();
try {
Get get = new Get(row);
- checkFamily(family);
- get.addColumn(family, qualifier);
+ if (family != null) {
+ checkFamily(family);
+ get.addColumn(family, qualifier);
+ }
+ if (filter != null) {
+ get.setFilter(filter);
+ }
if (timeRange != null) {
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
@@ -4217,11 +4236,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Call coprocessor.
Boolean processed = null;
if (mutation instanceof Put) {
- processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
- qualifier, op, comparator, (Put)mutation);
+ if (filter != null) {
+ processed = this.getCoprocessorHost()
+ .preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
+ } else {
+ processed = this.getCoprocessorHost()
+ .preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
+ (Put) mutation);
+ }
} else if (mutation instanceof Delete) {
- processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
- qualifier, op, comparator, (Delete)mutation);
+ if (filter != null) {
+ processed = this.getCoprocessorHost()
+ .preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
+ } else {
+ processed = this.getCoprocessorHost()
+ .preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
+ (Delete) mutation);
+ }
}
if (processed != null) {
return processed;
@@ -4231,20 +4262,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Supposition is that now all changes are done under row locks, then when we go to read,
// we'll get the latest on this row.
List<Cell> result = get(get, false);
- boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
boolean matches = false;
long cellTs = 0;
- if (result.isEmpty() && valueIsNull) {
- matches = true;
- } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
- matches = true;
- cellTs = result.get(0).getTimestamp();
- } else if (result.size() == 1 && !valueIsNull) {
- Cell kv = result.get(0);
- cellTs = kv.getTimestamp();
- int compareResult = PrivateCellUtil.compareValue(kv, comparator);
- matches = matches(op, compareResult);
+ if (filter != null) {
+ if (!result.isEmpty()) {
+ matches = true;
+ cellTs = result.get(0).getTimestamp();
+ }
+ } else {
+ boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
+ if (result.isEmpty() && valueIsNull) {
+ matches = true;
+ } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
+ matches = true;
+ cellTs = result.get(0).getTimestamp();
+ } else if (result.size() == 1 && !valueIsNull) {
+ Cell kv = result.get(0);
+ cellTs = kv.getTimestamp();
+ int compareResult = PrivateCellUtil.compareValue(kv, comparator);
+ matches = matches(op, compareResult);
+ }
}
+
// If matches put the new put or delete the new delete
if (matches) {
// We have acquired the row lock already. If the system clock is NOT monotonically
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index ed4ead0..ab1aea3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -613,9 +614,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param cellScanner if non-null, the mutation data -- the Cell content.
*/
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
- final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
- ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
- ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
+ final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
+ CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
+ RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
+ throws IOException {
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
@@ -658,7 +660,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.addResultOrException(
resultOrExceptionOrBuilder.build());
}
- return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
+
+ if (filter != null) {
+ return region.checkAndRowMutate(row, filter, timeRange, rm);
+ } else {
+ return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
+ }
} finally {
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
// even if the malformed cells are not skipped.
@@ -2728,18 +2735,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.getFamily().toByteArray();
- byte[] qualifier = condition.getQualifier().toByteArray();
- CompareOperator op =
- CompareOperator.valueOf(condition.getCompareType().name());
- ByteArrayComparable comparator =
- ProtobufUtil.toComparator(condition.getComparator());
+ byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+ byte[] qualifier = condition.hasQualifier() ?
+ condition.getQualifier().toByteArray() : null;
+ CompareOperator op = condition.hasCompareType() ?
+ CompareOperator.valueOf(condition.getCompareType().name()) : null;
+ ByteArrayComparable comparator = condition.hasComparator() ?
+ ProtobufUtil.toComparator(condition.getComparator()) : null;
+ Filter filter = condition.hasFilter() ?
+ ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
processed =
checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
- qualifier, op, comparator, timeRange, regionActionResultBuilder,
+ qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
spaceQuotaEnforcement);
} else {
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
@@ -2878,24 +2888,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.getFamily().toByteArray();
- byte[] qualifier = condition.getQualifier().toByteArray();
- CompareOperator compareOp =
- CompareOperator.valueOf(condition.getCompareType().name());
- ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
+ byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+ byte[] qualifier = condition.hasQualifier() ?
+ condition.getQualifier().toByteArray() : null;
+ CompareOperator op = condition.hasCompareType() ?
+ CompareOperator.valueOf(condition.getCompareType().name()) : null;
+ ByteArrayComparable comparator = condition.hasComparator() ?
+ ProtobufUtil.toComparator(condition.getComparator()) : null;
+ Filter filter = condition.hasFilter() ?
+ ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
- processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
- compareOp, comparator, put);
+ if (filter != null) {
+ processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
+ } else {
+ processed = region.getCoprocessorHost()
+ .preCheckAndPut(row, family, qualifier, op, comparator, put);
+ }
}
if (processed == null) {
- boolean result = region.checkAndMutate(row, family,
- qualifier, compareOp, comparator, timeRange, put);
+ boolean result;
+ if (filter != null) {
+ result = region.checkAndMutate(row, filter, timeRange, put);
+ } else {
+ result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
+ put);
+ }
if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndPut(row, family,
- qualifier, compareOp, comparator, put, result);
+ if (filter != null) {
+ result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
+ } else {
+ result = region.getCoprocessorHost()
+ .postCheckAndPut(row, family, qualifier, op, comparator, put, result);
+ }
}
processed = result;
}
@@ -2912,23 +2939,42 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.getFamily().toByteArray();
- byte[] qualifier = condition.getQualifier().toByteArray();
- CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
- ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
+ byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+ byte[] qualifier = condition.hasQualifier() ?
+ condition.getQualifier().toByteArray() : null;
+ CompareOperator op = condition.hasCompareType() ?
+ CompareOperator.valueOf(condition.getCompareType().name()) : null;
+ ByteArrayComparable comparator = condition.hasComparator() ?
+ ProtobufUtil.toComparator(condition.getComparator()) : null;
+ Filter filter = condition.hasFilter() ?
+ ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
ProtobufUtil.toTimeRange(condition.getTimeRange()) :
TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
- processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
- comparator, delete);
+ if (filter != null) {
+ processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
+ } else {
+ processed = region.getCoprocessorHost()
+ .preCheckAndDelete(row, family, qualifier, op, comparator, delete);
+ }
}
if (processed == null) {
- boolean result = region.checkAndMutate(row, family,
- qualifier, op, comparator, timeRange, delete);
+ boolean result;
+ if (filter != null) {
+ result = region.checkAndMutate(row, filter, timeRange, delete);
+ } else {
+ result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
+ delete);
+ }
if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndDelete(row, family,
- qualifier, op, comparator, delete, result);
+ if (filter != null) {
+ result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
+ result);
+ } else {
+ result = region.getCoprocessorHost()
+ .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
+ }
}
processed = result;
}
@@ -2979,7 +3025,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break;
default:
break;
-
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index ecc2158..1790468 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.yetus.audience.InterfaceAudience;
@@ -331,6 +332,31 @@ public interface Region extends ConfigurationObserver {
ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
/**
+ * Atomically checks if a row matches the filter and if it does, it performs the mutation. See
+ * checkAndRowMutate to do many checkAndPuts at a time on a single row.
+ * @param row to check
+ * @param filter the filter
+ * @param mutation data to put if check succeeds
+ * @return true if mutation was applied, false otherwise
+ */
+ default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
+ throws IOException {
+ return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
+ }
+
+ /**
+ * Atomically checks if a row value matches the filter and if it does, it performs the mutation.
+ * See checkAndRowMutate to do many checkAndPuts at a time on a single row.
+ * @param row to check
+ * @param filter the filter
+ * @param mutation data to put if check succeeds
+ * @param timeRange time range to check
+ * @return true if mutation was applied, false otherwise
+ */
+ boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
+ throws IOException;
+
+ /**
* Atomically checks if a row/family/qualifier value matches the expected values and if it does,
* it performs the row mutations. If the passed value is null, the lack of column value
* (ie: non-existence) is used. Use to do many mutations on a single row. Use checkAndMutate
@@ -368,6 +394,33 @@ public interface Region extends ConfigurationObserver {
throws IOException;
/**
+ * Atomically checks if a row matches the filter and if it does, it performs the row mutations.
+ * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
+ * time.
+ * @param row to check
+ * @param filter the filter
+ * @param mutations data to put if check succeeds
+ * @return true if mutations were applied, false otherwise
+ */
+ default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
+ throws IOException {
+ return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
+ }
+
+ /**
+ * Atomically checks if a row matches the filter and if it does, it performs the row mutations.
+ * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
+ * time.
+ * @param row to check
+ * @param filter the filter
+ * @param mutations data to put if check succeeds
+ * @param timeRange time range to check
+ * @return true if mutations were applied, false otherwise
+ */
+ boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
+ RowMutations mutations) throws IOException;
+
+ /**
* Deletes the specified cells/row.
* @param delete
* @throws IOException
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index c75e562..331e7cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -1082,13 +1083,38 @@ public class RegionCoprocessorHost
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
+ * @param filter filter
+ * @param put data to put if check succeeds
+ * @return true or false to return to client if default processing should be bypassed, or null
+ * otherwise
+ */
+ public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
+ throws IOException {
+ boolean bypassable = true;
+ boolean defaultResult = false;
+ if (coprocEnvironments.isEmpty()) {
+ return null;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+ defaultResult, bypassable) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndPut(this, row, filter, put, getResult());
+ }
+ });
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds
* @return true or false to return to client if default processing should be bypassed, or null
- * otherwise
+ * otherwise
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="Null is legit")
@@ -1112,6 +1138,33 @@ public class RegionCoprocessorHost
}
/**
+ * Supports Coprocessor 'bypass'.
+ * @param row row to check
+ * @param filter filter
+ * @param put data to put if check succeeds
+ * @return true or false to return to client if default processing should be bypassed, or null
+ * otherwise
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
+ justification="Null is legit")
+ public Boolean preCheckAndPutAfterRowLock(
+ final byte[] row, final Filter filter, final Put put) throws IOException {
+ boolean bypassable = true;
+ boolean defaultResult = false;
+ if (coprocEnvironments.isEmpty()) {
+ return null;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+ defaultResult, bypassable) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
+ }
+ });
+ }
+
+ /**
* @param row row to check
* @param family column family
* @param qualifier column qualifier
@@ -1138,6 +1191,26 @@ public class RegionCoprocessorHost
}
/**
+ * @param row row to check
+ * @param filter filter
+ * @param put data to put if check succeeds
+ * @throws IOException e
+ */
+ public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
+ boolean result) throws IOException {
+ if (this.coprocEnvironments.isEmpty()) {
+ return result;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postCheckAndPut(this, row, filter, put, getResult());
+ }
+ });
+ }
+
+ /**
* Supports Coprocessor 'bypass'.
* @param row row to check
* @param family column family
@@ -1145,8 +1218,8 @@ public class RegionCoprocessorHost
* @param op the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds
- * @return true or false to return to client if default processing should be bypassed,
- * or null otherwise
+ * @return true or false to return to client if default processing should be bypassed, or null
+ * otherwise
*/
public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOperator op,
@@ -1171,6 +1244,31 @@ public class RegionCoprocessorHost
/**
* Supports Coprocessor 'bypass'.
* @param row row to check
+ * @param filter filter
+ * @param delete delete to commit if check succeeds
+ * @return true or false to return to client if default processing should be bypassed, or null
+ * otherwise
+ */
+ public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
+ throws IOException {
+ boolean bypassable = true;
+ boolean defaultResult = false;
+ if (coprocEnvironments.isEmpty()) {
+ return null;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+ defaultResult, bypassable) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndDelete(this, row, filter, delete, getResult());
+ }
+ });
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param op the comparison operation
@@ -1201,6 +1299,33 @@ public class RegionCoprocessorHost
}
/**
+ * Supports Coprocessor 'bypass'.
+ * @param row row to check
+ * @param filter filter
+ * @param delete delete to commit if check succeeds
+ * @return true or false to return to client if default processing should be bypassed,
+ * or null otherwise
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
+ justification="Null is legit")
+ public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
+ final Delete delete) throws IOException {
+ boolean bypassable = true;
+ boolean defaultResult = false;
+ if (coprocEnvironments.isEmpty()) {
+ return null;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+ defaultResult, bypassable) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
+ }
+ });
+ }
+
+ /**
* @param row row to check
* @param family column family
* @param qualifier column qualifier
@@ -1227,6 +1352,26 @@ public class RegionCoprocessorHost
}
/**
+ * @param row row to check
+ * @param filter filter
+ * @param delete delete to commit if check succeeds
+ * @throws IOException e
+ */
+ public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
+ boolean result) throws IOException {
+ if (this.coprocEnvironments.isEmpty()) {
+ return result;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postCheckAndDelete(this, row, filter, delete, getResult());
+ }
+ });
+ }
+
+ /**
* Supports Coprocessor 'bypass'.
* @param append append object
* @return result to return to client if default operation should be bypassed, null otherwise
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 63080b9..b9fb811 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -41,10 +42,17 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -402,6 +410,201 @@ public class TestAsyncTable {
}
@Test
+ public void testCheckAndMutateWithSingleFilter() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put one row
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+ put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+ put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+ table.put(put).get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+ .get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
+
+ // Delete with success
+ ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
+ .get();
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
+
+ // Mutate with success
+ ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .thenMutate(new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
+ .get();
+ assertTrue(ok);
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
+ }
+
+ @Test
+ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put one row
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+ put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+ put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+ table.put(put).get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(row, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(row, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))
+ ))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+ .get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
+
+ // Delete with success
+ ok = table.checkAndMutate(row, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ))
+ .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
+ .get();
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
+
+ // Mutate with success
+ ok = table.checkAndMutate(row, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ))
+ .thenMutate(new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
+ .get();
+ assertTrue(ok);
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
+ }
+
+ @Test
+ public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put with specifying the timestamp
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(row, new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(100L))
+ ))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
+ .get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(row, new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(101L))
+ ))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+ .get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put with specifying the timestamp
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
+ .get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 101))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
+ .get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 100))
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+ .get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+ getTable.get().checkAndMutate(row, FAMILY)
+ .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+ }
+
+ @Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
index 15ef065..934c09c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
@@ -17,13 +17,24 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -184,4 +195,183 @@ public class TestCheckAndMutate {
}
}
+ @Test
+ public void testCheckAndMutateWithSingleFilter() throws Throwable {
+ try (Table table = createTable()) {
+ // put one row
+ putOneRow(table);
+ // get row back and assert the values
+ getOneRowAndAssertAllExist(table);
+
+ // Put with success
+ boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
+ Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+ // Delete with success
+ ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+ // Mutate with success
+ ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .thenMutate(new RowMutations(ROWKEY)
+ .add((Mutation) new Put(ROWKEY)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
+ assertTrue(ok);
+
+ result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+ try (Table table = createTable()) {
+ // put one row
+ putOneRow(table);
+ // get row back and assert the values
+ getOneRowAndAssertAllExist(table);
+
+ // Put with success
+ boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(ROWKEY, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))
+ ))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+ // Delete with success
+ ok = table.checkAndMutate(ROWKEY, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ))
+ .thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+ // Mutate with success
+ ok = table.checkAndMutate(ROWKEY, new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ))
+ .thenMutate(new RowMutations(ROWKEY)
+ .add((Mutation) new Put(ROWKEY)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
+ assertTrue(ok);
+
+ result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+ try (Table table = createTable()) {
+ // Put with specifying the timestamp
+ table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+ // Put with success
+ boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(100L))
+ ))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(ROWKEY, new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(101L))
+ ))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+ try (Table table = createTable()) {
+ // Put with specifying the timestamp
+ table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+ // Put with success
+ boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
+ Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 101))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 100))
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+ try (Table table = createTable()) {
+ table.checkAndMutate(ROWKEY, FAMILY)
+ .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index ef4ca25..43d72e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -30,12 +30,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -238,8 +238,7 @@ public class TestMalformedCellFromClient {
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
ClientProtos.Condition condition = RequestConverter
- .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
- HBaseProtos.CompareType.EQUAL, null);
+ .buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null);
for (Mutation mutation : rm.getMutations()) {
ClientProtos.MutationProto.MutationType mutateType = null;
if (mutation instanceof Put) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index caf0abb..523466d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -99,11 +100,17 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPostIncrement = new AtomicInteger(0);
final AtomicInteger ctPostAppend = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndPutWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndPutWithFilterAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0);
+ final AtomicInteger ctPostCheckAndPutWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndDeleteWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0);
+ final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
+ final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
@@ -493,6 +500,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ Filter filter, Put put, boolean result) throws IOException {
+ ctPreCheckAndPutWithFilter.incrementAndGet();
+ return true;
+ }
+
+ @Override
public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
ByteArrayComparable comparator, Put put, boolean result) throws IOException {
@@ -501,6 +515,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+ byte[] row, Filter filter, Put put, boolean result) throws IOException {
+ ctPreCheckAndPutWithFilterAfterRowLock.incrementAndGet();
+ return true;
+ }
+
+ @Override
public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
Put put, boolean result) throws IOException {
@@ -509,6 +530,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ Filter filter, Put put, boolean result) throws IOException {
+ ctPostCheckAndPutWithFilter.incrementAndGet();
+ return true;
+ }
+
+ @Override
public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
@@ -517,6 +545,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ Filter filter, Delete delete, boolean result) throws IOException {
+ ctPreCheckAndDeleteWithFilter.incrementAndGet();
+ return true;
+ }
+
+ @Override
public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
@@ -525,6 +560,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+ byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
+ ctPreCheckAndDeleteWithFilterAfterRowLock.incrementAndGet();
+ return true;
+ }
+
+ @Override
public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
Delete delete, boolean result) throws IOException {
@@ -533,6 +575,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
+ Filter filter, Delete delete, boolean result) throws IOException {
+ ctPostCheckAndDeleteWithFilter.incrementAndGet();
+ return true;
+ }
+
+ @Override
public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
Append append) throws IOException {
ctPreAppendAfterRowLock.incrementAndGet();
@@ -693,28 +742,52 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPostCloseRegionOperation.get();
}
- public boolean hadPreCheckAndPut() {
- return ctPreCheckAndPut.get() > 0;
+ public int getPreCheckAndPut() {
+ return ctPreCheckAndPut.get();
+ }
+
+ public int getPreCheckAndPutWithFilter() {
+ return ctPreCheckAndPutWithFilter.get();
+ }
+
+ public int getPreCheckAndPutAfterRowLock() {
+ return ctPreCheckAndPutAfterRowLock.get();
+ }
+
+ public int getPreCheckAndPutWithFilterAfterRowLock() {
+ return ctPreCheckAndPutWithFilterAfterRowLock.get();
+ }
+
+ public int getPostCheckAndPut() {
+ return ctPostCheckAndPut.get();
+ }
+
+ public int getPostCheckAndPutWithFilter() {
+ return ctPostCheckAndPutWithFilter.get();
+ }
+
+ public int getPreCheckAndDelete() {
+ return ctPreCheckAndDelete.get();
}
- public boolean hadPreCheckAndPutAfterRowLock() {
- return ctPreCheckAndPutAfterRowLock.get() > 0;
+ public int getPreCheckAndDeleteWithFilter() {
+ return ctPreCheckAndDeleteWithFilter.get();
}
- public boolean hadPostCheckAndPut() {
- return ctPostCheckAndPut.get() > 0;
+ public int getPreCheckAndDeleteAfterRowLock() {
+ return ctPreCheckAndDeleteAfterRowLock.get();
}
- public boolean hadPreCheckAndDelete() {
- return ctPreCheckAndDelete.get() > 0;
+ public int getPreCheckAndDeleteWithFilterAfterRowLock() {
+ return ctPreCheckAndDeleteWithFilterAfterRowLock.get();
}
- public boolean hadPreCheckAndDeleteAfterRowLock() {
- return ctPreCheckAndDeleteAfterRowLock.get() > 0;
+ public int getPostCheckAndDelete() {
+ return ctPostCheckAndDelete.get();
}
- public boolean hadPostCheckAndDelete() {
- return ctPostCheckAndDelete.get() > 0;
+ public int getPostCheckAndDeleteWithFilter() {
+ return ctPostCheckAndDeleteWithFilter.get();
}
public boolean hadPreIncrement() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 8a12cd6..5b77027 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -263,12 +265,26 @@ public class TestRegionObserverInterface {
p = new Put(Bytes.toBytes(0));
p.addColumn(A, A, A);
verifyMethodResult(SimpleRegionObserver.class,
- new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
- tableName, new Boolean[] { false, false, false });
+ new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
+ "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
+ "getPostCheckAndPutWithFilter" },
+ tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
verifyMethodResult(SimpleRegionObserver.class,
- new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
- tableName, new Boolean[] { true, true, true });
+ new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
+ "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
+ "getPostCheckAndPutWithFilter" },
+ tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+
+ table.checkAndMutate(Bytes.toBytes(0),
+ new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
+ .thenPut(p);
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
+ "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
+ "getPostCheckAndPutWithFilter" },
+ tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
} finally {
util.deleteTable(tableName);
}
@@ -285,14 +301,29 @@ public class TestRegionObserverInterface {
Delete d = new Delete(Bytes.toBytes(0));
table.delete(d);
verifyMethodResult(
- SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
- "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
- tableName, new Boolean[] { false, false, false });
+ SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
+ "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
+ "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
+ "getPostCheckAndDeleteWithFilter" },
+ tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+
table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
verifyMethodResult(
- SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
- "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
- tableName, new Boolean[] { true, true, true });
+ SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
+ "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
+ "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
+ "getPostCheckAndDeleteWithFilter" },
+ tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+
+ table.checkAndMutate(Bytes.toBytes(0),
+ new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
+ .thenDelete(d);
+ verifyMethodResult(
+ SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
+ "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
+ "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
+ "getPostCheckAndDeleteWithFilter" },
+ tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
} finally {
util.deleteTable(tableName);
table.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
index 37a1236..5a1315d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
/**
@@ -274,6 +275,11 @@ public class RegionAsTable implements Table {
}
@Override
+ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void mutateRow(RowMutations rm) throws IOException {
throw new UnsupportedOperationException();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 8ed1c6b..f58b82a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -2149,6 +2150,128 @@ public class TestHRegion {
assertEquals(0, r.size());
}
+ @Test
+ public void testCheckAndMutate_WithFilters() throws Throwable {
+ final byte[] FAMILY = Bytes.toBytes("fam");
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+ // Put one row
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+ put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+ put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+ region.put(put);
+
+ // Put with success
+ boolean ok = region.checkAndMutate(row,
+ new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ),
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+ assertTrue(ok);
+
+ Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = region.checkAndMutate(row,
+ new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))
+ ),
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
+ assertFalse(ok);
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
+
+ // Delete with success
+ ok = region.checkAndMutate(row,
+ new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ),
+ new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")));
+ assertTrue(ok);
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
+
+ // Mutate with success
+ ok = region.checkAndRowMutate(row,
+ new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))
+ ),
+ new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
+ assertTrue(ok);
+
+ result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+ }
+
+ @Test
+ public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
+ final byte[] FAMILY = Bytes.toBytes("fam");
+
+ // Setting up region
+ this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+ // Put with specifying the timestamp
+ region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+ // Put with success
+ boolean ok = region.checkAndMutate(row,
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ TimeRange.between(0, 101),
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+ assertTrue(ok);
+
+ Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = region.checkAndMutate(row,
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ TimeRange.between(0, 100),
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+ assertFalse(ok);
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
+
+ // Mutate with success
+ ok = region.checkAndRowMutate(row,
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ TimeRange.between(0, 101),
+ new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
+ assertTrue(ok);
+
+ result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+ }
+
// ////////////////////////////////////////////////////////////////////////////
// Delete tests
// ////////////////////////////////////////////////////////////////////////////
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
index 6db9474..942e232 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
@@ -429,6 +430,11 @@ public class ThriftTable implements Table {
}
@Override
+ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+ throw new NotImplementedException("Implement later");
+ }
+
+ @Override
public void mutateRow(RowMutations rm) throws IOException {
TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
try {