You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2020/06/11 02:48:01 UTC
[hbase] branch master updated: HBASE-8458 Support for batch version
of checkAndMutate()
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new efd5a96 HBASE-8458 Support for batch version of checkAndMutate()
efd5a96 is described below
commit efd5a962e5a6aa07dcba4b55f8b165ea3dbbd6ef
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Mon May 4 16:53:41 2020 +0900
HBASE-8458 Support for batch version of checkAndMutate()
Closes #1648
Signed-off-by: Josh Elser <el...@apache.org>
---
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 16 +-
.../org/apache/hadoop/hbase/client/AsyncTable.java | 47 ++
.../apache/hadoop/hbase/client/AsyncTableImpl.java | 11 +
.../apache/hadoop/hbase/client/CheckAndMutate.java | 362 ++++++++++
.../org/apache/hadoop/hbase/client/Mutation.java | 20 +-
.../hadoop/hbase/client/RawAsyncTableImpl.java | 62 +-
.../java/org/apache/hadoop/hbase/client/Table.java | 41 ++
.../hadoop/hbase/client/TableOverAsyncTable.java | 10 +
.../hbase/shaded/protobuf/RequestConverter.java | 215 +++---
.../hbase/shaded/protobuf/ResponseConverter.java | 21 +-
.../src/main/protobuf/client/Client.proto | 11 +-
.../hadoop/hbase/rest/client/RemoteHTable.java | 11 +
.../hadoop/hbase/regionserver/RSRpcServices.java | 262 ++++++--
.../hadoop/hbase/client/DummyAsyncTable.java | 10 +
.../apache/hadoop/hbase/client/TestAsyncTable.java | 732 ++++++++++++++++++++-
.../hadoop/hbase/client/TestAsyncTableBatch.java | 54 ++
.../hadoop/hbase/client/TestCheckAndMutate.java | 574 +++++++++++++++-
.../hadoop/hbase/client/TestFromClientSide3.java | 54 ++
.../hbase/client/TestMalformedCellFromClient.java | 5 +-
.../hadoop/hbase/thrift2/client/ThriftTable.java | 11 +
20 files changed, 2270 insertions(+), 259 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 464eff5..7e05b05 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -256,7 +256,7 @@ class AsyncBatchRpcRetryingCaller<T> {
}
private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
- List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
+ List<CellScannable> cells, Map<Integer, Integer> indexMap) throws IOException {
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
@@ -264,14 +264,14 @@ class AsyncBatchRpcRetryingCaller<T> {
for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
// multiRequestBuilder will be populated with region actions.
- // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
+ // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
// action list.
RequestConverter.buildNoDataRegionActions(entry.getKey(),
entry.getValue().actions.stream()
.sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
.collect(Collectors.toList()),
- cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
- rowMutationsIndexMap);
+ cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder,
+ nonceGroup, indexMap);
}
return multiRequestBuilder.build();
}
@@ -367,10 +367,10 @@ class AsyncBatchRpcRetryingCaller<T> {
List<CellScannable> cells = new ArrayList<>();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
- // is RowMutations in the action list.
- Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
+ // is RowMutations/CheckAndMutate in the action list.
+ Map<Integer, Integer> indexMap = new HashMap<>();
try {
- req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
+ req = buildReq(serverReq.actionsByRegion, cells, indexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
@@ -387,7 +387,7 @@ class AsyncBatchRpcRetryingCaller<T> {
} else {
try {
onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
- rowMutationsIndexMap, resp, controller.cellScanner()));
+ indexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index ce1c1dc..b2bb2f7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -231,12 +231,20 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* });
* </code>
* </pre>
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
/**
* A helper class for sending checkAndMutate request.
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
interface CheckAndMutateBuilder {
/**
@@ -309,12 +317,20 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* });
* </code>
* </pre>
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
/**
* A helper class for sending checkAndMutate request with a filter.
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
interface CheckAndMutateWithFilterBuilder {
/**
@@ -345,6 +361,37 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
}
/**
+ * checkAndMutate that atomically checks if a row matches the specified condition. If it does,
+ * it performs the specified action.
+ *
+ * @param checkAndMutate The CheckAndMutate object.
+ * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
+ */
+ CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate);
+
+ /**
+ * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
+ * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
+ * atomically (and thus, each may fail independently of others).
+ *
+ * @param checkAndMutates The list of CheckAndMutate.
+ * @return A list of {@link CompletableFuture}s that represent the result for each
+ * CheckAndMutate.
+ */
+ List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates);
+
+ /**
+ * A simple version of batch checkAndMutate. It will fail if there are any failures.
+ *
+ * @param checkAndMutates The list of rows to apply.
+ * @return A {@link CompletableFuture} that wrapper the result boolean list.
+ */
+ default CompletableFuture<List<Boolean>> checkAndMutateAll(
+ List<CheckAndMutate> checkAndMutates) {
+ return allOf(checkAndMutate(checkAndMutates));
+ }
+
+ /**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
* @param mutation object that specifies the set of mutations to perform atomically
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index afd0fac..53a020e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -206,6 +206,17 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
@Override
+ public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate) {
+ return wrap(rawTable.checkAndMutate(checkAndMutate));
+ }
+
+ @Override
+ public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
+ return rawTable.checkAndMutate(checkAndMutates).stream()
+ .map(this::wrap).collect(toList());
+ }
+
+ @Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
new file mode 100644
index 0000000..d596093
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
+ * and {@link RowMutations} are supported.
+ * <p>
+ * Use the builder class to instantiate a CheckAndMutate object.
+ * This builder class is fluent style APIs, the code are like:
+ * <pre>
+ * <code>
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row equals to the specified value
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ * .ifEquals(family, qualifier, value)
+ * .build(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row doesn't exist
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ * .ifNotExists(family, qualifier)
+ * .build(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the row matches the filter
+ * CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
+ * .ifMatches(filter)
+ * .build(delete);
+ * </code>
+ * </pre>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class CheckAndMutate extends Mutation {
+
+ /**
+ * A builder class for building a CheckAndMutate object.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static final class Builder {
+ private final byte[] row;
+ private byte[] family;
+ private byte[] qualifier;
+ private CompareOperator op;
+ private byte[] value;
+ private Filter filter;
+ private TimeRange timeRange;
+
+ private Builder(byte[] row) {
+ this.row = Preconditions.checkNotNull(row, "row is null");
+ }
+
+ /**
+ * Check for lack of column
+ *
+ * @param family family to check
+ * @param qualifier qualifier to check
+ * @return the CheckAndMutate object
+ */
+ public Builder ifNotExists(byte[] family, byte[] qualifier) {
+ return ifEquals(family, qualifier, null);
+ }
+
+ /**
+ * Check for equality
+ *
+ * @param family family to check
+ * @param qualifier qualifier to check
+ * @param value the expected value
+ * @return the CheckAndMutate object
+ */
+ public Builder ifEquals(byte[] family, byte[] qualifier, byte[] value) {
+ return ifMatches(family, qualifier, CompareOperator.EQUAL, value);
+ }
+
+ /**
+ * @param family family to check
+ * @param qualifier qualifier to check
+ * @param compareOp comparison operator to use
+ * @param value the expected value
+ * @return the CheckAndMutate object
+ */
+ public Builder ifMatches(byte[] family, byte[] qualifier, CompareOperator compareOp,
+ byte[] value) {
+ this.family = Preconditions.checkNotNull(family, "family is null");
+ this.qualifier = qualifier;
+ this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * @param filter filter to check
+ * @return the CheckAndMutate object
+ */
+ public Builder ifMatches(Filter filter) {
+ this.filter = Preconditions.checkNotNull(filter, "filter is null");
+ return this;
+ }
+
+ /**
+ * @param timeRange time range to check
+ * @return the CheckAndMutate object
+ */
+ public Builder timeRange(TimeRange timeRange) {
+ this.timeRange = timeRange;
+ return this;
+ }
+
+ private void preCheck(Row action) {
+ Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is null");
+ if (!Bytes.equals(row, action.getRow())) {
+ throw new IllegalArgumentException("The row of the action (Put/Delete/RowMutations) <" +
+ Bytes.toStringBinary(action.getRow()) + "> doesn't match the original one <" +
+ Bytes.toStringBinary(this.row) + ">");
+ }
+ Preconditions.checkState(op != null || filter != null, "condition is null. You need to"
+ + " specify the condition by calling ifNotExists/ifEquals/ifMatches before building a"
+ + " CheckAndMutate object");
+ }
+
+ /**
+ * @param put data to put if check succeeds
+ * @return a CheckAndMutate object
+ */
+ public CheckAndMutate build(Put put) {
+ preCheck(put);
+ if (filter != null) {
+ return new CheckAndMutate(row, filter, timeRange, put);
+ } else {
+ return new CheckAndMutate(row, family, qualifier, op, value, timeRange, put);
+ }
+ }
+
+ /**
+ * @param delete data to delete if check succeeds
+ * @return a CheckAndMutate object
+ */
+ public CheckAndMutate build(Delete delete) {
+ preCheck(delete);
+ if (filter != null) {
+ return new CheckAndMutate(row, filter, timeRange, delete);
+ } else {
+ return new CheckAndMutate(row, family, qualifier, op, value, timeRange, delete);
+ }
+ }
+
+ /**
+ * @param mutation mutations to perform if check succeeds
+ * @return a CheckAndMutate object
+ */
+ public CheckAndMutate build(RowMutations mutation) {
+ preCheck(mutation);
+ if (filter != null) {
+ return new CheckAndMutate(row, filter, timeRange, mutation);
+ } else {
+ return new CheckAndMutate(row, family, qualifier, op, value, timeRange, mutation);
+ }
+ }
+ }
+
+ /**
+ * returns a builder object to build a CheckAndMutate object
+ *
+ * @param row row
+ * @return a builder object
+ */
+ public static Builder newBuilder(byte[] row) {
+ return new Builder(row);
+ }
+
+ private final byte[] family;
+ private final byte[] qualifier;
+ private final CompareOperator op;
+ private final byte[] value;
+ private final Filter filter;
+ private final TimeRange timeRange;
+ private final Row action;
+
+ private CheckAndMutate(byte[] row, byte[] family, byte[] qualifier,final CompareOperator op,
+ byte[] value, TimeRange timeRange, Row action) {
+ super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
+ this.family = family;
+ this.qualifier = qualifier;
+ this.op = op;
+ this.value = value;
+ this.filter = null;
+ this.timeRange = timeRange;
+ this.action = action;
+ }
+
+ private CheckAndMutate(byte[] row, Filter filter, TimeRange timeRange, Row action) {
+ super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
+ this.family = null;
+ this.qualifier = null;
+ this.op = null;
+ this.value = null;
+ this.filter = filter;
+ this.timeRange = timeRange;
+ this.action = action;
+ }
+
+ /**
+ * @return the family to check
+ */
+ public byte[] getFamily() {
+ return family;
+ }
+
+ /**
+ * @return the qualifier to check
+ */
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+
+ /**
+ * @return the comparison operator
+ */
+ public CompareOperator getCompareOp() {
+ return op;
+ }
+
+ /**
+ * @return the expected value
+ */
+ public byte[] getValue() {
+ return value;
+ }
+
+ /**
+ * @return the filter to check
+ */
+ public Filter getFilter() {
+ return filter;
+ }
+
+ /**
+ * @return the time range to check
+ */
+ public TimeRange getTimeRange() {
+ return timeRange;
+ }
+
+ /**
+ * @return the action done if check succeeds
+ */
+ public Row getAction() {
+ return action;
+ }
+
+ @Override
+ public NavigableMap<byte[], List<Cell>> getFamilyCellMap() {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).getFamilyCellMap();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CellBuilder getCellBuilder(CellBuilderType cellBuilderType) {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).getCellBuilder();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getTimestamp() {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).getTimestamp();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Mutation setTimestamp(long timestamp) {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).setTimestamp(timestamp);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Durability getDurability() {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).getDurability();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Mutation setDurability(Durability d) {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).setDurability(d);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getAttribute(String name) {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).getAttribute(name);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperationWithAttributes setAttribute(String name, byte[] value) {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).setAttribute(name, value);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getPriority() {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).getPriority();
+ }
+ return ((RowMutations) action).getMaxPriority();
+ }
+
+ @Override
+ public OperationWithAttributes setPriority(int priority) {
+ if (action instanceof Mutation) {
+ return ((Mutation) action).setPriority(priority);
+ }
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 2bfa49d..d575d0b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -153,10 +153,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
* @return a list of Cell objects, returns an empty list if one doesn't exist.
*/
List<Cell> getCellList(byte[] family) {
- List<Cell> list = this.familyMap.get(family);
+ List<Cell> list = getFamilyCellMap().get(family);
if (list == null) {
list = new ArrayList<>();
- this.familyMap.put(family, list);
+ getFamilyCellMap().put(family, list);
}
return list;
}
@@ -205,11 +205,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
@Override
public Map<String, Object> getFingerprint() {
Map<String, Object> map = new HashMap<>();
- List<String> families = new ArrayList<>(this.familyMap.entrySet().size());
+ List<String> families = new ArrayList<>(getFamilyCellMap().entrySet().size());
// ideally, we would also include table information, but that information
// is not stored in each Operation instance.
map.put("families", families);
- for (Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
+ for (Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) {
families.add(Bytes.toStringBinary(entry.getKey()));
}
return map;
@@ -233,7 +233,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
map.put("row", Bytes.toStringBinary(this.row));
int colCount = 0;
// iterate through all column families affected
- for (Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
+ for (Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) {
// map from this family to details for each cell affected within the family
List<Map<String, Object>> qualifierDetails = new ArrayList<>();
columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
@@ -310,7 +310,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
* @return true if empty, false otherwise
*/
public boolean isEmpty() {
- return familyMap.isEmpty();
+ return getFamilyCellMap().isEmpty();
}
/**
@@ -441,7 +441,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
public int size() {
int size = 0;
- for (List<Cell> cells : this.familyMap.values()) {
+ for (List<Cell> cells : getFamilyCellMap().values()) {
size += cells.size();
}
return size;
@@ -451,7 +451,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
* @return the number of different families
*/
public int numFamilies() {
- return familyMap.size();
+ return getFamilyCellMap().size();
}
/**
@@ -465,8 +465,8 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// Adding map overhead
heapsize +=
- ClassSize.align(this.familyMap.size() * ClassSize.MAP_ENTRY);
- for(Map.Entry<byte [], List<Cell>> entry : this.familyMap.entrySet()) {
+ ClassSize.align(getFamilyCellMap().size() * ClassSize.MAP_ENTRY);
+ for(Map.Entry<byte [], List<Cell>> entry : getFamilyCellMap().entrySet()) {
//Adding key overhead
heapsize +=
ClassSize.align(ClassSize.ARRAY + entry.getKey().length);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 0c86161..fa5f7cf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -358,7 +359,6 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return new CheckAndMutateBuilderImpl(row, family);
}
-
private final class CheckAndMutateWithFilterBuilderImpl
implements CheckAndMutateWithFilterBuilder {
@@ -420,6 +420,54 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return new CheckAndMutateWithFilterBuilderImpl(row, filter);
}
+ @Override
+ public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate) {
+ if (checkAndMutate.getAction() instanceof Put) {
+ validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
+ }
+ if (checkAndMutate.getAction() instanceof Put ||
+ checkAndMutate.getAction() instanceof Delete) {
+ Mutation mutation = (Mutation) checkAndMutate.getAction();
+ if (mutation instanceof Put) {
+ validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
+ }
+ return RawAsyncTableImpl.this.<Boolean> newCaller(checkAndMutate.getRow(),
+ mutation.getPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
+ loc, stub, mutation,
+ (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
+ checkAndMutate.getTimeRange(), m),
+ (c, r) -> r.getProcessed()))
+ .call();
+ } else if (checkAndMutate.getAction() instanceof RowMutations) {
+ RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
+ return RawAsyncTableImpl.this.<Boolean> newCaller(checkAndMutate.getRow(),
+ rowMutations.getMaxPriority(), rpcTimeoutNs)
+ .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
+ loc, stub, rowMutations,
+ (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
+ checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+ checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
+ checkAndMutate.getTimeRange(), rm),
+ resp -> resp.getExists()))
+ .call();
+ } else {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ future.completeExceptionally(new DoNotRetryIOException(
+ "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
+ return future;
+ }
+ }
+
+ @Override
+ public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
+ return batch(checkAndMutates, rpcTimeoutNs).stream()
+ .map(f -> f.thenApply(r -> ((Result)r).getExists()))
+ .collect(toList());
+ }
+
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
@@ -556,8 +604,16 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
- actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action)
- .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
+ for (Row action : actions) {
+ if (action instanceof Put) {
+ validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
+ } else if (action instanceof CheckAndMutate) {
+ CheckAndMutate checkAndMutate = (CheckAndMutate) action;
+ if (checkAndMutate.getAction() instanceof Put) {
+ validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
+ }
+ }
+ }
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 870d83d..bcd045f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -307,14 +307,22 @@ public interface Table extends Closeable {
* table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put);
* </code>
* </pre>
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
default CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
throw new NotImplementedException("Add an implementation!");
}
/**
* A helper class for sending checkAndMutate request.
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
interface CheckAndMutateBuilder {
/**
@@ -377,14 +385,22 @@ public interface Table extends Closeable {
* table.checkAndMutate(row, filter).thenPut(put);
* </code>
* </pre>
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
throw new NotImplementedException("Add an implementation!");
}
/**
* A helper class for sending checkAndMutate request with a filter.
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+ * any more.
*/
+ @Deprecated
interface CheckAndMutateWithFilterBuilder {
/**
@@ -412,6 +428,31 @@ public interface Table extends Closeable {
}
/**
+ * checkAndMutate that atomically checks if a row matches the specified condition. If it does,
+ * it performs the specified action.
+ *
+ * @param checkAndMutate The CheckAndMutate object.
+ * @return boolean that represents the result for the CheckAndMutate.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ default boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+ return checkAndMutate(Collections.singletonList(checkAndMutate))[0];
+ }
+
+ /**
+ * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
+ * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
+ * atomically (and thus, each may fail independently of others).
+ *
+ * @param checkAndMutates The list of CheckAndMutate.
+ * @return A array of boolean that represents the result for each CheckAndMutate.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ default boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) throws IOException {
+ throw new NotImplementedException("Add an implementation!");
+ }
+
+ /**
* Performs multiple mutations atomically on a single row. Currently
* {@link Put} and {@link Delete} are supported.
*
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index 841f8ba..d33cbe1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -300,6 +300,16 @@ class TableOverAsyncTable implements Table {
}
@Override
+ public boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+ return FutureUtils.get(table.checkAndMutate(checkAndMutate));
+ }
+
+ @Override
+ public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) throws IOException {
+ return Booleans.toArray(FutureUtils.get(table.checkAndMutateAll(checkAndMutates)));
+ }
+
+ @Override
public void mutateRow(RowMutations rm) throws IOException {
FutureUtils.get(table.mutateRow(rm));
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 74a0493..a524ed3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -194,37 +195,20 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer MutateRequest for a conditioned put
+ * Create a protocol buffer MutateRequest for a conditioned put/delete
*
* @return a mutate request
* @throws IOException
*/
- public static MutateRequest buildMutateRequest(
- final byte[] regionName, final byte[] row, final byte[] family,
- final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
- final TimeRange timeRange, final Put put) throws IOException {
- return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
- put, MutationType.PUT);
- }
-
- /**
- * Create a protocol buffer MutateRequest for a conditioned delete
- *
- * @return a mutate request
- * @throws IOException
- */
- public static MutateRequest buildMutateRequest(
- final byte[] regionName, final byte[] row, final byte[] family,
- final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
- final TimeRange timeRange, final Delete delete) throws IOException {
- return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
- delete, MutationType.DELETE);
- }
-
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
- final Filter filter, final TimeRange timeRange, final Mutation mutation,
- final MutationType type) throws IOException {
+ final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
+ MutationType type;
+ if (mutation instanceof Put) {
+ type = MutationType.PUT;
+ } else {
+ type = MutationType.DELETE;
+ }
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(type, mutation))
@@ -263,9 +247,8 @@ public final class RequestConverter {
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
- return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
- .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
- .build();
+ return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
+ buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build();
}
/**
@@ -383,42 +366,6 @@ public final class RequestConverter {
return builder;
}
- /**
- * Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells
- * are carried outside of protobuf. Return references to the Cells in <code>cells</code> param.
- * Does not propagate Action absolute position. Does not set atomic action on the created
- * RegionAtomic. Caller should do that if wanted.
- * @param regionName
- * @param rowMutations
- * @param cells Return in here a list of Cells as CellIterable.
- * @return a region mutation minus data
- * @throws IOException
- */
- public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
- final RowMutations rowMutations, final List<CellScannable> cells,
- final RegionAction.Builder regionActionBuilder,
- final ClientProtos.Action.Builder actionBuilder,
- final MutationProto.Builder mutationBuilder)
- throws IOException {
- for (Mutation mutation: rowMutations.getMutations()) {
- MutationType type = null;
- if (mutation instanceof Put) {
- type = MutationType.PUT;
- } else if (mutation instanceof Delete) {
- type = MutationType.DELETE;
- } else {
- throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
- mutation.getClass().getName());
- }
- mutationBuilder.clear();
- MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
- cells.add(mutation);
- actionBuilder.clear();
- regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
- }
- return regionActionBuilder;
- }
-
public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
@@ -573,8 +520,8 @@ public final class RequestConverter {
* @param actionBuilder actionBuilder to be used to build action.
* @param mutationBuilder mutationBuilder to be used to build mutation.
* @param nonceGroup nonceGroup to be applied.
- * @param rowMutationsIndexMap Map of created RegionAction to the original index for a
- * RowMutations within the original list of actions
+ * @param indexMap Map of created RegionAction to the original index for a
+ * RowMutations/CheckAndMutate within the original list of actions
* @throws IOException
*/
public static void buildNoDataRegionActions(final byte[] regionName,
@@ -583,14 +530,14 @@ public final class RequestConverter {
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder,
- long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
+ long nonceGroup, final Map<Integer, Integer> indexMap) throws IOException {
regionActionBuilder.clear();
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
regionActionBuilder, regionName);
ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
- RegionAction.Builder rowMutationsRegionActionBuilder = null;
boolean hasNonce = false;
List<Action> rowMutationsList = new ArrayList<>();
+ List<Action> checkAndMutates = new ArrayList<>();
for (Action action: actions) {
Row row = action.getAction();
@@ -601,26 +548,9 @@ public final class RequestConverter {
Get g = (Get)row;
builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
} else if (row instanceof Put) {
- Put p = (Put)row;
- cells.add(p);
- builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p, mutationBuilder)));
+ buildNoDataRegionAction((Put) row, cells, builder, actionBuilder, mutationBuilder);
} else if (row instanceof Delete) {
- Delete d = (Delete)row;
- int size = d.size();
- // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing
- // in it but the row to delete. In this case, the current implementation does not make
- // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases
- // where the size returned is zero, we will send the Delete fully pb'd rather than have
- // metadata only in the pb and then send the kv along the side in cells.
- if (size > 0) {
- cells.add(d);
- builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d, mutationBuilder)));
- } else {
- builder.addAction(actionBuilder.
- setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d, mutationBuilder)));
- }
+ buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder);
} else if (row instanceof Append) {
Append a = (Append)row;
cells.add(a);
@@ -651,6 +581,8 @@ public final class RequestConverter {
.setRequest(value)));
} else if (row instanceof RowMutations) {
rowMutationsList.add(action);
+ } else if (row instanceof CheckAndMutate) {
+ checkAndMutates.add(action);
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
@@ -666,23 +598,104 @@ public final class RequestConverter {
// on the one row. We do separate RegionAction for each RowMutations.
// We maintain a map to keep track of this RegionAction and the original Action index.
for (Action action : rowMutationsList) {
- RowMutations rms = (RowMutations) action.getAction();
- if (rowMutationsRegionActionBuilder == null) {
- rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder();
+ builder.clear();
+ getRegionActionBuilderWithRegion(builder, regionName);
+ actionBuilder.clear();
+ mutationBuilder.clear();
+
+ buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder,
+ mutationBuilder);
+ builder.setAtomic(true);
+
+ multiRequestBuilder.addRegionAction(builder.build());
+
+ // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
+ // in the overall multiRequest.
+ indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
+ }
+
+ // Process CheckAndMutate here. Similar to RowMutations, we do separate RegionAction for each
+ // CheckAndMutate and maintain a map to keep track of this RegionAction and the original
+ // Action index.
+ for (Action action : checkAndMutates) {
+ builder.clear();
+ getRegionActionBuilderWithRegion(builder, regionName);
+ actionBuilder.clear();
+ mutationBuilder.clear();
+
+ CheckAndMutate cam = (CheckAndMutate) action.getAction();
+ builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
+ cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
+
+ if (cam.getAction() instanceof Put) {
+ buildNoDataRegionAction((Put) cam.getAction(), cells, builder, actionBuilder,
+ mutationBuilder);
+ } else if (cam.getAction() instanceof Delete) {
+ buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder,
+ mutationBuilder);
+ } else if (cam.getAction() instanceof RowMutations) {
+ buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
+ mutationBuilder);
+ builder.setAtomic(true);
} else {
- rowMutationsRegionActionBuilder.clear();
+ throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
+ cam.getAction().getClass().getName());
}
- rowMutationsRegionActionBuilder.setRegion(
- RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
- rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms,
- cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder);
- rowMutationsRegionActionBuilder.setAtomic(true);
- // Put it in the multiRequestBuilder
- multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
- // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount() - 1)
+
+ multiRequestBuilder.addRegionAction(builder.build());
+
+ // This CheckAndMutate region action is at (multiRequestBuilder.getRegionActionCount() - 1)
// in the overall multiRequest.
- rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
- action.getOriginalIndex());
+ indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
+ }
+ }
+
+ private static void buildNoDataRegionAction(final Put put, final List<CellScannable> cells,
+ final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder,
+ final MutationProto.Builder mutationBuilder) throws IOException {
+ cells.add(put);
+ regionActionBuilder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, put, mutationBuilder)));
+ }
+
+ private static void buildNoDataRegionAction(final Delete delete,
+ final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
+ throws IOException {
+ int size = delete.size();
+ // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing
+ // in it but the row to delete. In this case, the current implementation does not make
+ // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases
+ // where the size returned is zero, we will send the Delete fully pb'd rather than have
+ // metadata only in the pb and then send the kv along the side in cells.
+ if (size > 0) {
+ cells.add(delete);
+ regionActionBuilder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, delete, mutationBuilder)));
+ } else {
+ regionActionBuilder.addAction(actionBuilder.
+ setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, mutationBuilder)));
+ }
+ }
+
+ private static void buildNoDataRegionAction(final RowMutations rowMutations,
+ final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
+ throws IOException {
+ for (Mutation mutation: rowMutations.getMutations()) {
+ MutationType type;
+ if (mutation instanceof Put) {
+ type = MutationType.PUT;
+ } else if (mutation instanceof Delete) {
+ type = MutationType.DELETE;
+ } else {
+ throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
+ mutation.getClass().getName());
+ }
+ MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
+ cells.add(mutation);
+ regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index d7378a6..19e6735 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -105,14 +105,14 @@ public final class ResponseConverter {
* Get the results from a protocol buffer MultiResponse
*
* @param request the original protocol buffer MultiRequest
- * @param rowMutationsIndexMap Used to support RowMutations in batch
+ * @param indexMap Used to support RowMutations/CheckAndMutate in batch
* @param response the protocol buffer MultiResponse to convert
* @param cells Cells to go with the passed in <code>proto</code>. Can be null.
* @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException
*/
public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
- final Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse response,
+ final Map<Integer, Integer> indexMap, final MultiResponse response,
final CellScanner cells) throws IOException {
int requestRegionActionCount = request.getRegionActionCount();
int responseRegionActionResultCount = response.getRegionActionResultCount();
@@ -149,18 +149,17 @@ public final class ResponseConverter {
Object responseValue;
- // For RowMutations action, if there is an exception, the exception is set
+ // For RowMutations/CheckAndMutate action, if there is an exception, the exception is set
// at the RegionActionResult level and the ResultOrException is null at the original index
- Integer rowMutationsIndex =
- (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i));
- if (rowMutationsIndex != null) {
- // This RegionAction is from a RowMutations in a batch.
+ Integer index = (indexMap == null ? null : indexMap.get(i));
+ if (index != null) {
+ // This RegionAction is from a RowMutations/CheckAndMutate in a batch.
// If there is an exception from the server, the exception is set at
// the RegionActionResult level, which has been handled above.
- responseValue = response.getProcessed() ?
+ responseValue = actionResult.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
- results.add(regionName, rowMutationsIndex, responseValue);
+ results.add(regionName, index, responseValue);
continue;
}
@@ -171,11 +170,11 @@ public final class ResponseConverter {
responseValue = ProtobufUtil.toResult(roe.getResult(), cells);
} else if (roe.hasServiceResult()) {
responseValue = roe.getServiceResult();
- } else{
+ } else {
// Sometimes, the response is just "it was processed". Generally, this occurs for things
// like mutateRows where either we get back 'processed' (or not) and optionally some
// statistics about the regions we touched.
- responseValue = response.getProcessed() ?
+ responseValue = actionResult.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
index fbb0769..7678211 100644
--- a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto
@@ -455,6 +455,7 @@ message RegionAction {
// When set, run mutations as atomic unit.
optional bool atomic = 2;
repeated Action action = 3;
+ optional Condition condition = 4;
}
/*
@@ -499,6 +500,7 @@ message RegionActionResult {
repeated ResultOrException resultOrException = 1;
// If the operation failed globally for this region, this exception is set
optional NameBytesPair exception = 2;
+ optional bool processed = 3;
}
/**
@@ -511,13 +513,16 @@ message RegionActionResult {
message MultiRequest {
repeated RegionAction regionAction = 1;
optional uint64 nonceGroup = 2;
- optional Condition condition = 3;
+ // Moved this to RegionAction in HBASE-8458. Keep it for backward compatibility. Need to remove
+ // it in the future.
+ optional Condition condition = 3 [deprecated=true];
}
message MultiResponse {
repeated RegionActionResult regionActionResult = 1;
- // used for mutate to indicate processed only
- optional bool processed = 2;
+ // Moved this to RegionActionResult in HBASE-8458. Keep it for backward compatibility. Need to
+ // remove it in the future.
+ optional bool processed = 2 [deprecated=true];
optional MultiRegionLoadStats regionStatistics = 3;
}
diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index aaf1954..d68ed58 100644
--- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -745,6 +746,16 @@ public class RemoteHTable implements Table {
}
@Override
+ public boolean checkAndMutate(CheckAndMutate checkAndMutate) {
+ throw new NotImplementedException("Implement later");
+ }
+
+ @Override
+ public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) {
+ throw new NotImplementedException("Implement later");
+ }
+
+ @Override
public Result increment(Increment increment) throws IOException {
throw new IOException("Increment not supported");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 1890a4d..44f4d02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2814,45 +2814,124 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
- // this will contain all the cells that we need to return. It's created later, if needed.
- List<CellScannable> cellsToReturn = null;
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
- Boolean processed = null;
- RegionScannersCloseCallBack closeCallBack = null;
- RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
this.rpcMultiRequestCount.increment();
this.requestCount.increment();
- Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
- .getRegionActionCount());
ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
- for (RegionAction regionAction : request.getRegionActionList()) {
+
+ // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
+ // following logic is for backward compatibility as old clients still use
+ // MultiRequest#condition in case of checkAndMutate with RowMutations.
+ if (request.hasCondition()) {
+ if (request.getRegionActionList().isEmpty()) {
+ // If the region action list is empty, do nothing.
+ responseBuilder.setProcessed(true);
+ return responseBuilder.build();
+ }
+
+ RegionAction regionAction = request.getRegionAction(0);
+
+ // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
+ // we can assume regionAction.getAtomic() is true here.
+ assert regionAction.getAtomic();
+
OperationQuota quota;
HRegion region;
- regionActionResultBuilder.clear();
RegionSpecifier regionSpecifier = regionAction.getRegion();
+
try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
- continue; // For this region it's a failure.
+ return responseBuilder.build();
}
- boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
- if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+
+ try {
+ boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
// We only allow replication in standby state and it will not set the atomic flag.
if (rejectIfFromClient) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
- new DoNotRetryIOException(
- region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
- quota.close();
- continue;
+ new DoNotRetryIOException(region.getRegionInfo().getRegionNameAsString()
+ + " is in STANDBY state"));
+ return responseBuilder.build();
}
- // How does this call happen? It may need some work to play well w/ the surroundings.
- // Need to return an item per Action along w/ Action index. TODO.
+
try {
- if (request.hasCondition()) {
- Condition condition = request.getCondition();
+ Condition condition = request.getCondition();
+ byte[] row = condition.getRow().toByteArray();
+ byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+ byte[] qualifier =
+ condition.hasQualifier() ? condition.getQualifier().toByteArray() : null;
+ CompareOperator op = condition.hasCompareType() ?
+ CompareOperator.valueOf(condition.getCompareType().name()) :
+ null;
+ ByteArrayComparable comparator = condition.hasComparator() ?
+ ProtobufUtil.toComparator(condition.getComparator()) : null;
+ Filter filter =
+ condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
+ TimeRange timeRange = condition.hasTimeRange() ?
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+ TimeRange.allTime();
+ boolean processed =
+ checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
+ qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
+ spaceQuotaEnforcement);
+ responseBuilder.setProcessed(processed);
+ } catch (IOException e) {
+ rpcServer.getMetrics().exception(e);
+ // As it's an atomic operation with a condition, we may expect it's a global failure.
+ regionActionResultBuilder.setException(ResponseConverter.buildException(e));
+ }
+ } finally {
+ quota.close();
+ }
+
+ responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+ ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
+ if (regionLoadStats != null) {
+ responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
+ .addRegion(regionSpecifier).addStat(regionLoadStats).build());
+ }
+ return responseBuilder.build();
+ }
+
+ // this will contain all the cells that we need to return. It's created later, if needed.
+ List<CellScannable> cellsToReturn = null;
+ RegionScannersCloseCallBack closeCallBack = null;
+ RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
+ Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
+ .getRegionActionCount());
+
+ for (RegionAction regionAction : request.getRegionActionList()) {
+ OperationQuota quota;
+ HRegion region;
+ RegionSpecifier regionSpecifier = regionAction.getRegion();
+ regionActionResultBuilder.clear();
+
+ try {
+ region = getRegion(regionSpecifier);
+ quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
+ } catch (IOException e) {
+ failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
+ continue; // For this region it's a failure.
+ }
+
+ try {
+ boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
+
+ if (regionAction.hasCondition()) {
+ // We only allow replication in standby state and it will not set the atomic flag.
+ if (rejectIfFromClient) {
+ failRegionAction(responseBuilder, regionActionResultBuilder, regionAction,
+ cellScanner, new DoNotRetryIOException(region.getRegionInfo()
+ .getRegionNameAsString() + " is in STANDBY state"));
+ continue;
+ }
+
+ try {
+ Condition condition = regionAction.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
byte[] qualifier = condition.hasQualifier() ?
@@ -2864,46 +2943,119 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Filter filter = condition.hasFilter() ?
ProtobufUtil.toFilter(condition.getFilter()) : null;
TimeRange timeRange = condition.hasTimeRange() ?
- ProtobufUtil.toTimeRange(condition.getTimeRange()) :
- TimeRange.allTime();
- processed =
- checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
- qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
- spaceQuotaEnforcement);
- } else {
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
+
+ boolean processed;
+ if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+ // RowMutations
+ processed =
+ checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
+ qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
+ spaceQuotaEnforcement);
+ } else {
+ if (regionAction.getActionList().isEmpty()) {
+ // If the region action list is empty, do nothing.
+ regionActionResultBuilder.setProcessed(true);
+ continue;
+ }
+ Action action = regionAction.getAction(0);
+ if (action.hasGet()) {
+ throw new DoNotRetryIOException("CheckAndMutate doesn't support GET="
+ + action.getGet());
+ }
+ MutationProto mutation = action.getMutation();
+ switch (mutation.getMutateType()) {
+ case PUT:
+ Put put = ProtobufUtil.toPut(mutation, cellScanner);
+ checkCellSizeLimit(region, put);
+ // Throws an exception when violated
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+ quota.addMutation(put);
+
+ if (filter != null) {
+ processed = region.checkAndMutate(row, filter, timeRange, put);
+ } else {
+ processed = region.checkAndMutate(row, family, qualifier, op, comparator,
+ timeRange, put);
+ }
+ break;
+
+ case DELETE:
+ Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+ checkCellSizeLimit(region, delete);
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
+ quota.addMutation(delete);
+
+ if (filter != null) {
+ processed = region.checkAndMutate(row, filter, timeRange, delete);
+ } else {
+ processed = region.checkAndMutate(row, family, qualifier, op, comparator,
+ timeRange, delete);
+ }
+ break;
+
+ default:
+ throw new DoNotRetryIOException("CheckAndMutate doesn't support "
+ + mutation.getMutateType());
+ }
+
+ // To unify the response format with doNonAtomicRegionMutation and read through
+ // client's AsyncProcess we have to add an empty result instance per operation
+ regionActionResultBuilder.addResultOrException(
+ ClientProtos.ResultOrException.newBuilder().setIndex(0).build());
+ }
+ regionActionResultBuilder.setProcessed(processed);
+ } catch (IOException e) {
+ rpcServer.getMetrics().exception(e);
+ // As it's an atomic operation with a condition, we may expect it's a global failure.
+ regionActionResultBuilder.setException(ResponseConverter.buildException(e));
+ }
+ } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+ // We only allow replication in standby state and it will not set the atomic flag.
+ if (rejectIfFromClient) {
+ failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
+ new DoNotRetryIOException(region.getRegionInfo().getRegionNameAsString()
+ + " is in STANDBY state"));
+ continue;
+ }
+ try {
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
cellScanner, spaceQuotaEnforcement);
- processed = Boolean.TRUE;
+ regionActionResultBuilder.setProcessed(true);
+ // We no longer use MultiResponse#processed. Instead, we use
+ // RegionActionResult#condition. This is for backward compatibility for old clients.
+ responseBuilder.setProcessed(true);
+ } catch (IOException e) {
+ rpcServer.getMetrics().exception(e);
+ // As it's atomic, we may expect it's a global failure.
+ regionActionResultBuilder.setException(ResponseConverter.buildException(e));
}
- } catch (IOException e) {
- rpcServer.getMetrics().exception(e);
- // As it's atomic, we may expect it's a global failure.
- regionActionResultBuilder.setException(ResponseConverter.buildException(e));
- }
- } else {
- if (rejectIfFromClient && regionAction.getActionCount() > 0 &&
- !isReplicationRequest(regionAction.getAction(0))) {
- // fail if it is not a replication request
- failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
- new DoNotRetryIOException(
- region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
- quota.close();
- continue;
- }
- // doNonAtomicRegionMutation manages the exception internally
- if (context != null && closeCallBack == null) {
- // An RpcCallBack that creates a list of scanners that needs to perform callBack
- // operation on completion of multiGets.
- // Set this only once
- closeCallBack = new RegionScannersCloseCallBack();
- context.setCallBack(closeCallBack);
- }
- cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
+ } else {
+ if (rejectIfFromClient && regionAction.getActionCount() > 0 && !isReplicationRequest(
+ regionAction.getAction(0))) {
+ // fail if it is not a replication request
+ failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
+ new DoNotRetryIOException(region.getRegionInfo().getRegionNameAsString()
+ + " is in STANDBY state"));
+ continue;
+ }
+ // doNonAtomicRegionMutation manages the exception internally
+ if (context != null && closeCallBack == null) {
+ // An RpcCallBack that creates a list of scanners that needs to perform callBack
+ // operation on completion of multiGets.
+ // Set this only once
+ closeCallBack = new RegionScannersCloseCallBack();
+ context.setCallBack(closeCallBack);
+ }
+ cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
spaceQuotaEnforcement);
+ }
+ } finally {
+ quota.close();
}
+
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
- quota.close();
ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
if (regionLoadStats != null) {
regionStats.put(regionSpecifier, regionLoadStats);
@@ -2914,10 +3066,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
- if (processed != null) {
- responseBuilder.setProcessed(processed);
- }
-
MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
for(Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat: regionStats.entrySet()){
builder.addRegion(stat.getKey());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
index 964e929..b545208 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
@@ -113,6 +113,16 @@ public class DummyAsyncTable<C extends ScanResultConsumerBase> implements AsyncT
}
@Override
+ public CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate) {
+ return null;
+ }
+
+ @Override
+ public List<CompletableFuture<Boolean>> checkAndMutate(List<CheckAndMutate> checkAndMutates) {
+ return null;
+ }
+
+ @Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index b9fb811..0de1892 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -246,9 +246,30 @@ public class TestAsyncTable {
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
}
+ @Test
+ public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
+ AsyncTable<?> table = getTable.get();
+ RowMutations mutation = new RowMutations(row);
+ mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
+ table.mutateRow(mutation).get();
+ Result result = table.get(new Get(row)).get();
+ assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
+
+ mutation = new RowMutations(row);
+ mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
+ mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
+ table.mutateRow(mutation).get();
+ result = table.get(new Get(row)).get();
+ assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
+ assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
+ }
+
+ // Tests for old checkAndMutate API
+
@SuppressWarnings("FutureReturnValueIgnored")
@Test
- public void testCheckAndPut() throws InterruptedException, ExecutionException {
+ @Deprecated
+ public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
@@ -271,7 +292,8 @@ public class TestAsyncTable {
@SuppressWarnings("FutureReturnValueIgnored")
@Test
- public void testCheckAndDelete() throws InterruptedException, ExecutionException {
+ @Deprecated
+ public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
@@ -307,27 +329,10 @@ public class TestAsyncTable {
});
}
- @Test
- public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
- AsyncTable<?> table = getTable.get();
- RowMutations mutation = new RowMutations(row);
- mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
- table.mutateRow(mutation).get();
- Result result = table.get(new Get(row)).get();
- assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
-
- mutation = new RowMutations(row);
- mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
- mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
- table.mutateRow(mutation).get();
- result = table.get(new Get(row)).get();
- assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
- assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
- }
-
@SuppressWarnings("FutureReturnValueIgnored")
@Test
- public void testCheckAndMutate() throws InterruptedException, ExecutionException {
+ @Deprecated
+ public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
@@ -371,7 +376,8 @@ public class TestAsyncTable {
}
@Test
- public void testCheckAndMutateWithTimeRange() throws Exception {
+ @Deprecated
+ public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
AsyncTable<?> table = getTable.get();
final long ts = System.currentTimeMillis() / 2;
Put put = new Put(row);
@@ -390,6 +396,7 @@ public class TestAsyncTable {
assertTrue(ok);
RowMutations rm = new RowMutations(row).add((Mutation) put);
+
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenMutate(rm).get();
assertFalse(ok);
@@ -410,7 +417,8 @@ public class TestAsyncTable {
}
@Test
- public void testCheckAndMutateWithSingleFilter() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
@@ -465,7 +473,8 @@ public class TestAsyncTable {
}
@Test
- public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
@@ -536,7 +545,8 @@ public class TestAsyncTable {
}
@Test
- public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
@@ -569,7 +579,8 @@ public class TestAsyncTable {
}
@Test
- public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
@@ -599,11 +610,678 @@ public class TestAsyncTable {
}
@Test(expected = NullPointerException.class)
- public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithoutConditionForOldApi() {
getTable.get().checkAndMutate(row, FAMILY)
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
+ // Tests for new CheckAndMutate API
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ @Test
+ public void testCheckAndPut() throws InterruptedException, ExecutionException {
+ AsyncTable<?> table = getTable.get();
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger successIndex = new AtomicInteger(-1);
+ int count = 10;
+ CountDownLatch latch = new CountDownLatch(count);
+
+ IntStream.range(0, count)
+ .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER)
+ .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))))
+ .thenAccept(x -> {
+ if (x) {
+ successCount.incrementAndGet();
+ successIndex.set(i);
+ }
+ latch.countDown();
+ }));
+ latch.await();
+ assertEquals(1, successCount.get());
+ String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
+ assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ @Test
+ public void testCheckAndDelete() throws InterruptedException, ExecutionException {
+ AsyncTable<?> table = getTable.get();
+ int count = 10;
+ CountDownLatch putLatch = new CountDownLatch(count + 1);
+ table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
+ IntStream.range(0, count)
+ .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
+ .thenRun(() -> putLatch.countDown()));
+ putLatch.await();
+
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger successIndex = new AtomicInteger(-1);
+ CountDownLatch deleteLatch = new CountDownLatch(count);
+
+ IntStream.range(0, count)
+ .forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .build(
+ new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
+ .thenAccept(x -> {
+ if (x) {
+ successCount.incrementAndGet();
+ successIndex.set(i);
+ }
+ deleteLatch.countDown();
+ }));
+ deleteLatch.await();
+ assertEquals(1, successCount.get());
+ Result result = table.get(new Get(row)).get();
+ IntStream.range(0, count).forEach(i -> {
+ if (i == successIndex.get()) {
+ assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
+ } else {
+ assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
+ }
+ });
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ @Test
+ public void testCheckAndMutate() throws InterruptedException, ExecutionException {
+ AsyncTable<?> table = getTable.get();
+ int count = 10;
+ CountDownLatch putLatch = new CountDownLatch(count + 1);
+ table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
+ IntStream.range(0, count)
+ .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
+ .thenRun(() -> putLatch.countDown()));
+ putLatch.await();
+
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger successIndex = new AtomicInteger(-1);
+ CountDownLatch mutateLatch = new CountDownLatch(count);
+ IntStream.range(0, count).forEach(i -> {
+ RowMutations mutation = new RowMutations(row);
+ try {
+ mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
+ mutation
+ .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .build(mutation))
+ .thenAccept(x -> {
+ if (x) {
+ successCount.incrementAndGet();
+ successIndex.set(i);
+ }
+ mutateLatch.countDown();
+ });
+ });
+ mutateLatch.await();
+ assertEquals(1, successCount.get());
+ Result result = table.get(new Get(row)).get();
+ IntStream.range(0, count).forEach(i -> {
+ if (i == successIndex.get()) {
+ assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
+ } else {
+ assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
+ }
+ });
+ }
+
+ @Test
+ public void testCheckAndMutateWithTimeRange() throws Exception {
+ AsyncTable<?> table = getTable.get();
+ final long ts = System.currentTimeMillis() / 2;
+ Put put = new Put(row);
+ put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
+
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER)
+ .build(put)).get();
+ assertTrue(ok);
+
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .timeRange(TimeRange.at(ts + 10000))
+ .build(put)).get();
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .timeRange(TimeRange.at(ts))
+ .build(put)).get();
+ assertTrue(ok);
+
+ RowMutations rm = new RowMutations(row).add((Mutation) put);
+
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .timeRange(TimeRange.at(ts + 10000))
+ .build(rm)).get();
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .timeRange(TimeRange.at(ts))
+ .build(rm)).get();
+ assertTrue(ok);
+
+ Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
+
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .timeRange(TimeRange.at(ts + 10000))
+ .build(delete)).get();
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, QUALIFIER, VALUE)
+ .timeRange(TimeRange.at(ts))
+ .build(delete)).get();
+ assertTrue(ok);
+ }
+
+ @Test
+ public void testCheckAndMutateWithSingleFilter() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put one row
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+ put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+ put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+ table.put(put).get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
+
+ // Delete with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
+
+ // Mutate with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .build(new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
+ assertTrue(ok);
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
+ }
+
+ @Test
+ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put one row
+ Put put = new Put(row);
+ put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+ put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+ put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+ table.put(put).get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
+
+ // Delete with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
+
+ // Mutate with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
+ assertTrue(ok);
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
+ }
+
+ @Test
+ public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put with specifying the timestamp
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(100L))))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(101L))))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+
+ // Put with specifying the timestamp
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
+ .get();
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
+ assertTrue(ok);
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 100))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
+ .get();
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
+ }
+
+ // Tests for batch version of checkAndMutate
+
+ @Test
+ public void testCheckAndMutateBatch() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+ byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+ byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+
+ table.putAll(Arrays.asList(
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+ new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
+
+ // Test for Put
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+ .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
+
+ List<Boolean> results =
+ table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for Delete
+ checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
+ .build(new Delete(row));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+ .build(new Delete(row2));
+
+ results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for RowMutations
+ checkAndMutate1 = CheckAndMutate.newBuilder(row3)
+ .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+ .build(new RowMutations(row3)
+ .add((Mutation) new Put(row3)
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+ .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+ .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
+ .build(new RowMutations(row4)
+ .add((Mutation) new Put(row4)
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+ .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
+
+ results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ result = table.get(new Get(row3)).get();
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
+
+ result = table.get(new Get(row4)).get();
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ }
+
+ @Test
+ public void testCheckAndMutateBatch2() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+ byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+ byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+
+ table.putAll(Arrays.asList(
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+ new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
+ new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get();
+
+ // Test for ifNotExists()
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, Bytes.toBytes("B"))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifNotExists(FAMILY, Bytes.toBytes("B"))
+ .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
+
+ List<Boolean> results =
+ table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for ifMatches()
+ checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
+ .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
+
+ results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
+ assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for timeRange()
+ checkAndMutate1 = CheckAndMutate.newBuilder(row3)
+ .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+ .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+ .timeRange(TimeRange.between(0, 100))
+ .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
+
+ results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ }
+
+ @Test
+ public void testCheckAndMutateBatchWithFilter() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+ table.putAll(Arrays.asList(
+ new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
+ .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(row2)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+ .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
+
+ // Test for Put
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
+
+ List<Boolean> results =
+ table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
+ assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+ // Test for Delete
+ checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
+
+ results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+ // Test for RowMutations
+ checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new RowMutations(row)
+ .add((Mutation) new Put(row)
+ .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+ .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new RowMutations(row2)
+ .add((Mutation) new Put(row2)
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
+ .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
+
+ results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ result = table.get(new Get(row)).get();
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
+ assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(row2)).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+ }
+
+ @Test
+ public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
+ AsyncTable<?> table = getTable.get();
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+ table.putAll(Arrays.asList(
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
+ .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
+ .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
+ .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
+
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("d")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("e"))))
+ .timeRange(TimeRange.between(0, 100))
+ .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
+
+ List<Boolean> results =
+ table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0));
+ assertFalse(results.get(1));
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
+ assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+ }
+
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index 42e61d7..ac82314 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -333,4 +334,57 @@ public class TestAsyncTableBatch {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
+
+ @Test
+ public void testWithCheckAndMutate() throws Exception {
+ AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
+
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] row2 = Bytes.toBytes("row2");
+ byte[] row3 = Bytes.toBytes("row3");
+ byte[] row4 = Bytes.toBytes("row4");
+ byte[] row5 = Bytes.toBytes("row5");
+
+ table.putAll(Arrays.asList(
+ new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+ new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
+ new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
+
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g")));
+ Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
+ RowMutations mutations = new RowMutations(row3)
+ .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
+ .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")));
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+ .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
+ .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
+ Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
+
+ List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
+ List<Object> results = table.batchAll(actions).get();
+
+ assertTrue(((Result) results.get(0)).getExists());
+ assertEquals("b",
+ Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
+ assertTrue(((Result) results.get(2)).getExists());
+ assertFalse(((Result) results.get(3)).getExists());
+ assertTrue(((Result) results.get(4)).isEmpty());
+
+ Result result = table.get(new Get(row1)).get();
+ assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(row3)).get();
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+ result = table.get(new Get(row4)).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ result = table.get(new Get(row5)).get();
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
index f399e86..f88c769 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -57,6 +59,9 @@ public class TestCheckAndMutate {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] ROWKEY = Bytes.toBytes("12345");
+ private static final byte[] ROWKEY2 = Bytes.toBytes("67890");
+ private static final byte[] ROWKEY3 = Bytes.toBytes("abcde");
+ private static final byte[] ROWKEY4 = Bytes.toBytes("fghij");
private static final byte[] FAMILY = Bytes.toBytes("cf");
@Rule
@@ -131,39 +136,11 @@ public class TestCheckAndMutate {
return rm;
}
- @Test
- public void testCheckAndMutate() throws Throwable {
- try (Table table = createTable()) {
- // put one row
- putOneRow(table);
- // get row back and assert the values
- getOneRowAndAssertAllExist(table);
-
- // put the same row again with C column deleted
- RowMutations rm = makeRowMutationsWithColumnCDeleted();
- boolean res = table.checkAndMutate(ROWKEY, FAMILY).qualifier(Bytes.toBytes("A"))
- .ifEquals(Bytes.toBytes("a")).thenMutate(rm);
- assertTrue(res);
-
- // get row back and assert the values
- getOneRowAndAssertAllButCExist(table);
-
- // Test that we get a region level exception
- try {
- rm = getBogusRowMutations();
- table.checkAndMutate(ROWKEY, FAMILY).qualifier(Bytes.toBytes("A"))
- .ifEquals(Bytes.toBytes("a")).thenMutate(rm);
- fail("Expected NoSuchColumnFamilyException");
- } catch (NoSuchColumnFamilyException e) {
- // expected
- } catch (RetriesExhaustedException e) {
- assertThat(e.getCause(), instanceOf(NoSuchColumnFamilyException.class));
- }
- }
- }
+ // Tests for old checkAndMutate API
@Test
- public void testCheckAndMutateWithBuilder() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateForOldApi() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
@@ -194,7 +171,8 @@ public class TestCheckAndMutate {
}
@Test
- public void testCheckAndMutateWithSingleFilter() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
@@ -243,7 +221,8 @@ public class TestCheckAndMutate {
}
@Test
- public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
try (Table table = createTable()) {
// put one row
putOneRow(table);
@@ -308,7 +287,8 @@ public class TestCheckAndMutate {
}
@Test
- public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
@@ -339,7 +319,8 @@ public class TestCheckAndMutate {
}
@Test
- public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
try (Table table = createTable()) {
// Put with specifying the timestamp
table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
@@ -366,10 +347,531 @@ public class TestCheckAndMutate {
}
@Test(expected = NullPointerException.class)
- public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+ @Deprecated
+ public void testCheckAndMutateWithoutConditionForOldApi() throws Throwable {
try (Table table = createTable()) {
table.checkAndMutate(ROWKEY, FAMILY)
.thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
}
+
+ // Tests for new CheckAndMutate API
+
+ @Test
+ public void testCheckAndMutate() throws Throwable {
+ try (Table table = createTable()) {
+ // put one row
+ putOneRow(table);
+ // get row back and assert the values
+ getOneRowAndAssertAllExist(table);
+
+ // put the same row again with C column deleted
+ RowMutations rm = makeRowMutationsWithColumnCDeleted();
+ boolean res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(rm));
+ assertTrue(res);
+
+ // get row back and assert the values
+ getOneRowAndAssertAllButCExist(table);
+
+ // Test that we get a region level exception
+ try {
+ rm = getBogusRowMutations();
+ table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(rm));
+ fail("Expected NoSuchColumnFamilyException");
+ } catch (NoSuchColumnFamilyException e) {
+ // expected
+ } catch (RetriesExhaustedException e) {
+ assertThat(e.getCause(), instanceOf(NoSuchColumnFamilyException.class));
+ }
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithSingleFilter() throws Throwable {
+ try (Table table = createTable()) {
+ // put one row
+ putOneRow(table);
+ // get row back and assert the values
+ getOneRowAndAssertAllExist(table);
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new SingleColumnValueFilter(FAMILY,
+ Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+ // Delete with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+ // Mutate with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+ CompareOperator.EQUAL, Bytes.toBytes("b")))
+ .build(new RowMutations(ROWKEY)
+ .add((Mutation) new Put(ROWKEY)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
+ assertTrue(ok);
+
+ result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+ try (Table table = createTable()) {
+ // put one row
+ putOneRow(table);
+ // get row back and assert the values
+ getOneRowAndAssertAllExist(table);
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+ // Delete with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
+ assertTrue(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+ // Mutate with success
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new RowMutations(ROWKEY)
+ .add((Mutation) new Put(ROWKEY)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+ .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
+ assertTrue(ok);
+
+ result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+ try (Table table = createTable()) {
+ // Put with specifying the timestamp
+ table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(100L))))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+ new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+ new TimestampsFilter(Collections.singletonList(101L))))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+ try (Table table = createTable()) {
+ // Put with specifying the timestamp
+ table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+ // Put with success
+ boolean ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new SingleColumnValueFilter(FAMILY,
+ Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+ assertTrue(ok);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Put with failure
+ ok = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+ CompareOperator.EQUAL, Bytes.toBytes("a")))
+ .timeRange(TimeRange.between(0, 100))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
+ assertFalse(ok);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testCheckAndMutateBuilderWithoutCondition() {
+ CheckAndMutate.newBuilder(ROWKEY)
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+ }
+
+ // Tests for batch version of checkAndMutate
+
+ @Test
+ public void testCheckAndMutateBatch() throws Throwable {
+ try (Table table = createTable()) {
+ table.put(Arrays.asList(
+ new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+ new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+ new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+
+ // Test for Put
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+ .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
+
+ boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for Delete
+ checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
+ .build(new Delete(ROWKEY));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
+ .build(new Delete(ROWKEY2));
+
+ results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for RowMutations
+ checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3)
+ .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+ .build(new RowMutations(ROWKEY3)
+ .add((Mutation) new Put(ROWKEY3)
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+ .add((Mutation) new Delete(ROWKEY3).addColumns(FAMILY, Bytes.toBytes("C"))));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4)
+ .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
+ .build(new RowMutations(ROWKEY4)
+ .add((Mutation) new Put(ROWKEY4)
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
+ .add((Mutation) new Delete(ROWKEY4).addColumns(FAMILY, Bytes.toBytes("D"))));
+
+ results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ result = table.get(new Get(ROWKEY3));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
+
+ result = table.get(new Get(ROWKEY4));
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateBatch2() throws Throwable {
+ try (Table table = createTable()) {
+ table.put(Arrays.asList(
+ new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+ new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+ new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
+ new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))));
+
+ // Test for ifNotExists()
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifNotExists(FAMILY, Bytes.toBytes("B"))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifNotExists(FAMILY, Bytes.toBytes("B"))
+ .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
+
+ boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for ifMatches()
+ checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
+ .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
+
+ results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
+ assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("B")));
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // Test for timeRange()
+ checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY3)
+ .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new Put(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY4)
+ .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+ .timeRange(TimeRange.between(0, 100))
+ .build(new Put(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
+
+ results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ result = table.get(new Get(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C")));
+ assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(ROWKEY4).addColumn(FAMILY, Bytes.toBytes("D")));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateBatchWithFilter() throws Throwable {
+ try (Table table = createTable()) {
+ table.put(Arrays.asList(
+ new Put(ROWKEY)
+ .addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
+ .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(ROWKEY2)
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
+ .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))));
+
+ // Test for Put
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
+
+ boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
+ assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+ // Test for Delete
+ checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("C")));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new Delete(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
+
+ results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+ // Test for RowMutations
+ checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new RowMutations(ROWKEY)
+ .add((Mutation) new Put(ROWKEY)
+ .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+ .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
+
+ checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .build(new RowMutations(ROWKEY2)
+ .add((Mutation) new Put(ROWKEY2)
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
+ .add((Mutation) new Delete(ROWKEY2).addColumns(FAMILY, Bytes.toBytes("D"))));
+
+ results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ result = table.get(new Get(ROWKEY));
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
+ assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(ROWKEY2));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+ }
+ }
+
+ @Test
+ public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
+ try (Table table = createTable()) {
+ table.put(Arrays.asList(
+ new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
+ .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
+ .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
+ .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
+ .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))));
+
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+ Bytes.toBytes("b"))))
+ .timeRange(TimeRange.between(0, 101))
+ .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
+
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
+ Bytes.toBytes("d")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
+ Bytes.toBytes("e"))))
+ .timeRange(TimeRange.between(0, 100))
+ .build(new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
+
+ boolean[] results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+ assertTrue(results[0]);
+ assertFalse(results[1]);
+
+ Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
+ assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
+
+ result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("F")));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 3de5c1b..1e281fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -453,6 +453,60 @@ public class TestFromClientSide3 {
}
@Test
+ public void testBatchWithCheckAndMutate() throws Exception {
+ try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] row2 = Bytes.toBytes("row2");
+ byte[] row3 = Bytes.toBytes("row3");
+ byte[] row4 = Bytes.toBytes("row4");
+ byte[] row5 = Bytes.toBytes("row5");
+
+ table.put(Arrays.asList(
+ new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+ new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
+ new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
+ new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("g")));
+ Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
+ RowMutations mutations = new RowMutations(row3)
+ .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
+ .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")));
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
+ .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
+ .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
+ Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
+
+ List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
+ Object[] results = new Object[actions.size()];
+ table.batch(actions, results);
+
+ assertTrue(((Result) results[0]).getExists());
+ assertEquals("b",
+ Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
+ assertTrue(((Result) results[2]).getExists());
+ assertFalse(((Result) results[3]).getExists());
+ assertTrue(((Result) results[4]).isEmpty());
+
+ Result result = table.get(new Get(row1));
+ assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
+
+ result = table.get(new Get(row3));
+ assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+ result = table.get(new Get(row4));
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+ result = table.get(new Get(row5));
+ assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+ }
+ }
+
+ @Test
public void testHTableExistsMethodSingleRegionSingleGet()
throws IOException, InterruptedException {
try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index 655225a..dc8c4ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -252,9 +252,8 @@ public class TestMalformedCellFromClient {
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
- ClientProtos.MultiRequest request =
- ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
- .setCondition(condition).build();
+ ClientProtos.MultiRequest request = ClientProtos.MultiRequest.newBuilder()
+ .addRegionAction(builder.setCondition(condition).build()).build();
return request;
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
index 30b1fa1..19154d6 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@@ -432,6 +433,16 @@ public class ThriftTable implements Table {
}
@Override
+ public boolean checkAndMutate(CheckAndMutate checkAndMutate) {
+ throw new NotImplementedException("Implement later");
+ }
+
+ @Override
+ public boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) {
+ throw new NotImplementedException("Implement later");
+ }
+
+ @Override
public void mutateRow(RowMutations rm) throws IOException {
TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
try {