You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2021/03/30 14:20:37 UTC
[hbase] branch branch-2.4 updated: HBASE-25703 Support conditional
update in MultiRowMutationEndpoint (#3107)
This is an automated email from the ASF dual-hosted git repository.
brfrn169 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 6f4ab4a HBASE-25703 Support conditional update in MultiRowMutationEndpoint (#3107)
6f4ab4a is described below
commit 6f4ab4a13458c4e9aeedb4fac43e66f5e67f3f38
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Tue Mar 30 23:04:29 2021 +0900
HBASE-25703 Support conditional update in MultiRowMutationEndpoint (#3107)
Signed-off-by: Michael Stack <st...@apache.org>
---
.../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 52 ++++
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 35 +++
.../hbase/shaded/protobuf/RequestConverter.java | 45 +--
.../src/main/protobuf/Client.proto | 4 +-
.../src/main/protobuf/MultiRowMutation.proto | 1 +
.../coprocessor/MultiRowMutationEndpoint.java | 176 ++++++++++-
.../hadoop/hbase/client/TestFromClientSide5.java | 329 ++++++++++++++++++++-
.../hbase/client/TestMalformedCellFromClient.java | 4 +-
8 files changed, 579 insertions(+), 67 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5bba1e1..dc7f7d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ExtendedCellBuilder;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -751,6 +753,9 @@ public final class ProtobufUtil {
*/
public static Mutation toMutation(final MutationProto proto) throws IOException {
MutationType type = proto.getMutateType();
+ if (type == MutationType.INCREMENT) {
+ return toIncrement(proto, null);
+ }
if (type == MutationType.APPEND) {
return toAppend(proto, null);
}
@@ -1790,4 +1795,51 @@ public final class ProtobufUtil {
.setTo(timeRange.getMax())
.build();
}
+
+ public static TimeRange toTimeRange(HBaseProtos.TimeRange timeRange) {
+ if (timeRange == null) {
+ return TimeRange.allTime();
+ }
+ if (timeRange.hasFrom()) {
+ if (timeRange.hasTo()) {
+ return TimeRange.between(timeRange.getFrom(), timeRange.getTo());
+ } else {
+ return TimeRange.from(timeRange.getFrom());
+ }
+ } else {
+ return TimeRange.until(timeRange.getTo());
+ }
+ }
+
+ public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+ final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+ final TimeRange timeRange) throws IOException {
+
+ ClientProtos.Condition.Builder builder = ClientProtos.Condition.newBuilder()
+ .setRow(ByteStringer.wrap(row));
+
+ if (filter != null) {
+ builder.setFilter(ProtobufUtil.toFilter(filter));
+ } else {
+ builder.setFamily(ByteStringer.wrap(family))
+ .setQualifier(ByteStringer.wrap(
+ qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
+ .setComparator(
+ ProtobufUtil.toComparator(new BinaryComparator(value)))
+ .setCompareType(HBaseProtos.CompareType.valueOf(op.name()));
+ }
+
+ return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
+ }
+
+ public static ClientProtos.Condition toCondition(final byte[] row, final Filter filter,
+ final TimeRange timeRange) throws IOException {
+ return toCondition(row, null, null, null, null, filter, timeRange);
+ }
+
+ public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+ final byte[] qualifier, final CompareOperator op, final byte[] value,
+ final TimeRange timeRange) throws IOException {
+ return toCondition(row, family, qualifier, op, value, null, timeRange);
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 9064ded..cb471d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -917,6 +918,9 @@ public final class ProtobufUtil {
*/
public static Mutation toMutation(final MutationProto proto) throws IOException {
MutationType type = proto.getMutateType();
+ if (type == MutationType.INCREMENT) {
+ return toIncrement(proto, null);
+ }
if (type == MutationType.APPEND) {
return toAppend(proto, null);
}
@@ -3632,6 +3636,37 @@ public final class ProtobufUtil {
}
}
+ public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+ final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+ final TimeRange timeRange) throws IOException {
+
+ ClientProtos.Condition.Builder builder = ClientProtos.Condition.newBuilder()
+ .setRow(UnsafeByteOperations.unsafeWrap(row));
+
+ if (filter != null) {
+ builder.setFilter(ProtobufUtil.toFilter(filter));
+ } else {
+ builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
+ .setQualifier(UnsafeByteOperations.unsafeWrap(
+ qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
+ .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
+ .setCompareType(HBaseProtos.CompareType.valueOf(op.name()));
+ }
+
+ return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
+ }
+
+ public static ClientProtos.Condition toCondition(final byte[] row, final Filter filter,
+ final TimeRange timeRange) throws IOException {
+ return toCondition(row, null, null, null, null, filter, timeRange);
+ }
+
+ public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+ final byte[] qualifier, final CompareOperator op, final byte[] value,
+ final TimeRange timeRange) throws IOException {
+ return toCondition(row, family, qualifier, op, value, null, timeRange);
+ }
+
public static List<LogEntry> toBalancerDecisionResponse(
HBaseProtos.LogEntry logEntry) {
try {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 21644f4..bdae13b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.master.RegionState;
@@ -102,7 +101,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationPr
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
@@ -257,7 +255,7 @@ public final class RequestConverter {
builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
}
return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
- .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
+ .setCondition(ProtobufUtil.toCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
@@ -271,8 +269,8 @@ public final class RequestConverter {
final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
- return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
- value, filter, timeRange), nonceGroup, nonce);
+ return buildMultiRequest(regionName, rowMutations, ProtobufUtil.toCondition(row, family,
+ qualifier, op, value, filter, timeRange), nonceGroup, nonce);
}
/**
@@ -666,8 +664,9 @@ public final class RequestConverter {
getRegionActionBuilderWithRegion(builder, regionName);
CheckAndMutate cam = (CheckAndMutate) action.getAction();
- builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
- cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
+ builder.setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(),
+ cam.getQualifier(), cam.getCompareOp(), cam.getValue(), cam.getFilter(),
+ cam.getTimeRange()));
if (cam.getAction() instanceof Put) {
actionBuilder.clear();
@@ -832,8 +831,9 @@ public final class RequestConverter {
getRegionActionBuilderWithRegion(builder, regionName);
CheckAndMutate cam = (CheckAndMutate) action.getAction();
- builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
- cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
+ builder.setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(),
+ cam.getQualifier(), cam.getCompareOp(), cam.getValue(), cam.getFilter(),
+ cam.getTimeRange()));
if (cam.getAction() instanceof Put) {
actionBuilder.clear();
@@ -1188,31 +1188,6 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer Condition
- *
- * @return a Condition
- * @throws IOException
- */
- public static Condition buildCondition(final byte[] row, final byte[] family,
- final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
- final TimeRange timeRange) throws IOException {
-
- Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
-
- if (filter != null) {
- builder.setFilter(ProtobufUtil.toFilter(filter));
- } else {
- builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
- .setQualifier(UnsafeByteOperations.unsafeWrap(
- qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
- .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
- .setCompareType(CompareType.valueOf(op.name()));
- }
-
- return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
- }
-
- /**
* Create a protocol buffer AddColumnRequest
*
* @param tableName
@@ -1245,7 +1220,7 @@ public final class RequestConverter {
final long nonceGroup,
final long nonce) {
DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder();
- builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
+ builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
builder.setColumnName(UnsafeByteOperations.unsafeWrap(columnName));
builder.setNonceGroup(nonceGroup);
builder.setNonce(nonce);
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 1b8b986..13917b6 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -132,8 +132,8 @@ message GetResponse {
}
/**
- * Condition to check if the value of a given cell (row,
- * family, qualifier) matches a value via a given comparator.
+ * Condition to check if the value of a given cell (row, family, qualifier) matches a value via a
+ * given comparator or the value of a given cell matches a given filter.
*
* Condition is used in check and mutate operations.
*/
diff --git a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
index d3140e9..571e633 100644
--- a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
+++ b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
@@ -37,6 +37,7 @@ message MutateRowsRequest {
optional uint64 nonce_group = 2;
optional uint64 nonce = 3;
optional RegionSpecifier region = 4;
+ repeated Condition condition = 5;
}
message MutateRowsResponse {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
index d230773..c840d54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
@@ -25,31 +25,43 @@ import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
- * This class demonstrates how to implement atomic multi row transactions using
- * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)}
- * and Coprocessor endpoints.
+ * This class implements atomic multi row transactions using
+ * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor
+ * endpoints. We can also specify some conditions to perform conditional update.
*
* Defines a protocol to perform multi row transactions.
* See {@link MultiRowMutationEndpoint} for the implementation.
@@ -60,18 +72,29 @@ import com.google.protobuf.Service;
* <br>
* Example:
* <code>
- * List<Mutation> mutations = ...;
- * Put p1 = new Put(row1);
- * Put p2 = new Put(row2);
+ * Put p = new Put(row1);
+ * Delete d = new Delete(row2);
+ * Increment i = new Increment(row3);
+ * Append a = new Append(row4);
* ...
- * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1);
- * Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2);
+ * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
+ * Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d);
+ * Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i);
+ * Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a);
+ *
* MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
* mrmBuilder.addMutationRequest(m1);
* mrmBuilder.addMutationRequest(m2);
+ * mrmBuilder.addMutationRequest(m3);
+ * mrmBuilder.addMutationRequest(m4);
+ *
+ * // We can also specify conditions to preform conditional update
+ * mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER,
+ * CompareOperator.EQUAL, value, TimeRange.allTime()));
+ *
* CoprocessorRpcChannel channel = t.coprocessorService(ROW);
* MultiRowMutationService.BlockingInterface service =
- * MultiRowMutationService.newBlockingStub(channel);
+ * MultiRowMutationService.newBlockingStub(channel);
* MutateRowsRequest mrm = mrmBuilder.build();
* service.mutateRows(null, mrm);
* </code>
@@ -79,11 +102,16 @@ import com.google.protobuf.Service;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class);
+
private RegionCoprocessorEnvironment env;
+
@Override
public void mutateRows(RpcController controller, MutateRowsRequest request,
RpcCallback<MutateRowsResponse> done) {
MutateRowsResponse response = MutateRowsResponse.getDefaultInstance();
+
+ List<Region.RowLock> rowLocks = null;
try {
// set of rows to lock, sorted to avoid deadlocks
SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@@ -93,7 +121,9 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
mutations.add(ProtobufUtil.toMutation(m));
}
- RegionInfo regionInfo = env.getRegion().getRegionInfo();
+ Region region = env.getRegion();
+
+ RegionInfo regionInfo = region.getRegionInfo();
for (Mutation m : mutations) {
// check whether rows are in range for this region
if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
@@ -110,16 +140,134 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
}
rowsToLock.add(m.getRow());
}
- // call utility method on region
- long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
- long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
- env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
+
+ boolean matches = true;
+ if (request.getConditionCount() > 0) {
+ // Get row locks for the mutations and the conditions
+ rowLocks = new ArrayList<>();
+ for (ClientProtos.Condition condition : request.getConditionList()) {
+ rowsToLock.add(condition.getRow().toByteArray());
+ }
+ for (byte[] row : rowsToLock) {
+ try {
+ Region.RowLock rowLock = region.getRowLock(row, false); // write lock
+ rowLocks.add(rowLock);
+ } catch (IOException ioe) {
+ LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row),
+ this, ioe);
+ throw ioe;
+ }
+ }
+
+ // Check if all the conditions match
+ for (ClientProtos.Condition condition : request.getConditionList()) {
+ if (!matches(region, condition)) {
+ matches = false;
+ break;
+ }
+ }
+ }
+
+ if (matches) {
+ // call utility method on region
+ long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
+ long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
+ region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (rowLocks != null) {
+ // Release the acquired row locks
+ for (Region.RowLock rowLock : rowLocks) {
+ rowLock.release();
+ }
+ }
}
done.run(response);
}
+ private boolean matches(Region region, ClientProtos.Condition condition) throws IOException {
+ byte[] row = condition.getRow().toByteArray();
+
+ Filter filter = null;
+ byte[] family = null;
+ byte[] qualifier = null;
+ CompareOperator op = null;
+ ByteArrayComparable comparator = null;
+
+ if (condition.hasFilter()) {
+ filter = ProtobufUtil.toFilter(condition.getFilter());
+ } else {
+ family = condition.getFamily().toByteArray();
+ qualifier = condition.getQualifier().toByteArray();
+ op = CompareOperator.valueOf(condition.getCompareType().name());
+ comparator = ProtobufUtil.toComparator(condition.getComparator());
+ }
+
+ TimeRange timeRange = condition.hasTimeRange() ?
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
+
+ Get get = new Get(row);
+ if (family != null) {
+ checkFamily(region, family);
+ get.addColumn(family, qualifier);
+ }
+ if (filter != null) {
+ get.setFilter(filter);
+ }
+ if (timeRange != null) {
+ get.setTimeRange(timeRange.getMin(), timeRange.getMax());
+ }
+
+ List<Cell> result = region.get(get, false);
+ boolean matches = false;
+ if (filter != null) {
+ if (!result.isEmpty()) {
+ matches = true;
+ }
+ } else {
+ boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
+ if (result.isEmpty() && valueIsNull) {
+ matches = true;
+ } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
+ matches = true;
+ } else if (result.size() == 1 && !valueIsNull) {
+ Cell kv = result.get(0);
+ int compareResult = PrivateCellUtil.compareValue(kv, comparator);
+ matches = matches(op, compareResult);
+ }
+ }
+ return matches;
+ }
+
+ private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException {
+ if (!region.getTableDescriptor().hasColumnFamily(family)) {
+ throw new NoSuchColumnFamilyException(
+ "Column family " + Bytes.toString(family) + " does not exist in region " + this
+ + " in table " + region.getTableDescriptor());
+ }
+ }
+
+ private boolean matches(CompareOperator op, int compareResult) {
+ switch (op) {
+ case LESS:
+ return compareResult < 0;
+ case LESS_OR_EQUAL:
+ return compareResult <= 0;
+ case EQUAL:
+ return compareResult == 0;
+ case NOT_EQUAL:
+ return compareResult != 0;
+ case GREATER_OR_EQUAL:
+ return compareResult >= 0;
+ case GREATER:
+ return compareResult > 0;
+ default:
+ throw new RuntimeException("Unknown Compare op " + op.name());
+ }
+ }
+
@Override
public Iterable<Service> getServices() {
return Collections.singleton(this);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
index 19256fd..809fd2a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -265,30 +266,330 @@ public class TestFromClientSide5 extends FromClientSideBase {
LOG.info("Starting testMultiRowMutation");
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
+ final byte [] ROW2 = Bytes.toBytes("testRow2");
+ final byte [] ROW3 = Bytes.toBytes("testRow3");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
- Put p = new Put(ROW);
- p.addColumn(FAMILY, QUALIFIER, VALUE);
- MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
+ // Add initial data
+ t.batch(Arrays.asList(
+ new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE),
+ new Put(ROW2).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(1L)),
+ new Put(ROW3).addColumn(FAMILY, QUALIFIER, VALUE)
+ ), new Object[3]);
- p = new Put(ROW1);
- p.addColumn(FAMILY, QUALIFIER, VALUE);
- MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, p);
+ // Execute MultiRowMutation
+ Put put = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put);
+
+ Delete delete = new Delete(ROW1);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+ Increment increment = new Increment(ROW2).addColumn(FAMILY, QUALIFIER, 1L);
+ MutationProto m3 = ProtobufUtil.toMutation(MutationType.INCREMENT, increment);
+
+ Append append = new Append(ROW3).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m4 = ProtobufUtil.toMutation(MutationType.APPEND, append);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
- MutateRowsRequest mrm = mrmBuilder.build();
+ mrmBuilder.addMutationRequest(m3);
+ mrmBuilder.addMutationRequest(m4);
+
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
- service.mutateRows(null, mrm);
- Get g = new Get(ROW);
- Result r = t.get(g);
- assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
- g = new Get(ROW1);
- r = t.get(g);
- assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
+ service.mutateRows(null, mrmBuilder.build());
+
+ // Assert
+ Result r = t.get(new Get(ROW));
+ assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW1));
+ assertTrue(r.isEmpty());
+
+ r = t.get(new Get(ROW2));
+ assertEquals(2L, Bytes.toLong(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW3));
+ assertEquals(Bytes.toString(VALUE) + Bytes.toString(VALUE),
+ Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+ }
+ }
+
+ @Test
+ public void testMultiRowMutationWithSingleConditionWhenConditionMatches() throws Exception {
+ final TableName tableName = name.getTableName();
+ final byte [] ROW1 = Bytes.toBytes("testRow1");
+ final byte [] ROW2 = Bytes.toBytes("testRow2");
+ final byte [] VALUE1 = Bytes.toBytes("testValue1");
+ final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+ try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+ // Add initial data
+ t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+ // Execute MultiRowMutation with conditions
+ Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+ Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+ Delete delete = new Delete(ROW2);
+ MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+ MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+ mrmBuilder.addMutationRequest(m1);
+ mrmBuilder.addMutationRequest(m2);
+ mrmBuilder.addMutationRequest(m3);
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+ CompareOperator.EQUAL, VALUE2, null));
+
+ CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+ MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationService.newBlockingStub(channel);
+ service.mutateRows(null, mrmBuilder.build());
+
+ // Assert
+ Result r = t.get(new Get(ROW));
+ assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW1));
+ assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW2));
+ assertTrue(r.isEmpty());
+ }
+ }
+
+ @Test
+ public void testMultiRowMutationWithSingleConditionWhenConditionNotMatch() throws Exception {
+ final TableName tableName = name.getTableName();
+ final byte [] ROW1 = Bytes.toBytes("testRow1");
+ final byte [] ROW2 = Bytes.toBytes("testRow2");
+ final byte [] VALUE1 = Bytes.toBytes("testValue1");
+ final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+ try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+ // Add initial data
+ t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+ // Execute MultiRowMutation with conditions
+ Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+ Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+ Delete delete = new Delete(ROW2);
+ MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+ MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+ mrmBuilder.addMutationRequest(m1);
+ mrmBuilder.addMutationRequest(m2);
+ mrmBuilder.addMutationRequest(m3);
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+ CompareOperator.EQUAL, VALUE1, null));
+
+ CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+ MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationService.newBlockingStub(channel);
+ service.mutateRows(null, mrmBuilder.build());
+
+ // Assert
+ Result r = t.get(new Get(ROW));
+ assertTrue(r.isEmpty());
+
+ r = t.get(new Get(ROW1));
+ assertTrue(r.isEmpty());
+
+ r = t.get(new Get(ROW2));
+ assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+ }
+ }
+
+ @Test
+ public void testMultiRowMutationWithMultipleConditionsWhenConditionsMatch() throws Exception {
+ final TableName tableName = name.getTableName();
+ final byte [] ROW1 = Bytes.toBytes("testRow1");
+ final byte [] ROW2 = Bytes.toBytes("testRow2");
+ final byte [] VALUE1 = Bytes.toBytes("testValue1");
+ final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+ try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+ // Add initial data
+ t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+ // Execute MultiRowMutation with conditions
+ Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+ Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+ Delete delete = new Delete(ROW2);
+ MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+ MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+ mrmBuilder.addMutationRequest(m1);
+ mrmBuilder.addMutationRequest(m2);
+ mrmBuilder.addMutationRequest(m3);
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW, FAMILY, QUALIFIER,
+ CompareOperator.EQUAL, null, null));
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+ CompareOperator.EQUAL, VALUE2, null));
+
+ CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+ MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationService.newBlockingStub(channel);
+ service.mutateRows(null, mrmBuilder.build());
+
+ // Assert
+ Result r = t.get(new Get(ROW));
+ assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW1));
+ assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW2));
+ assertTrue(r.isEmpty());
+ }
+ }
+
+ @Test
+ public void testMultiRowMutationWithMultipleConditionsWhenConditionsNotMatch() throws Exception {
+ final TableName tableName = name.getTableName();
+ final byte [] ROW1 = Bytes.toBytes("testRow1");
+ final byte [] ROW2 = Bytes.toBytes("testRow2");
+ final byte [] VALUE1 = Bytes.toBytes("testValue1");
+ final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+ try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+ // Add initial data
+ t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+ // Execute MultiRowMutation with conditions
+ Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+ Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+ Delete delete = new Delete(ROW2);
+ MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+ MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+ mrmBuilder.addMutationRequest(m1);
+ mrmBuilder.addMutationRequest(m2);
+ mrmBuilder.addMutationRequest(m3);
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW1, FAMILY, QUALIFIER,
+ CompareOperator.EQUAL, null, null));
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+ CompareOperator.EQUAL, VALUE1, null));
+
+ CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+ MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationService.newBlockingStub(channel);
+ service.mutateRows(null, mrmBuilder.build());
+
+ // Assert
+ Result r = t.get(new Get(ROW));
+ assertTrue(r.isEmpty());
+
+ r = t.get(new Get(ROW1));
+ assertTrue(r.isEmpty());
+
+ r = t.get(new Get(ROW2));
+ assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+ }
+ }
+
+ @Test
+ public void testMultiRowMutationWithFilterConditionWhenConditionMatches() throws Exception {
+ final TableName tableName = name.getTableName();
+ final byte [] ROW1 = Bytes.toBytes("testRow1");
+ final byte [] ROW2 = Bytes.toBytes("testRow2");
+ final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2");
+ final byte [] VALUE1 = Bytes.toBytes("testValue1");
+ final byte [] VALUE2 = Bytes.toBytes("testValue2");
+ final byte [] VALUE3 = Bytes.toBytes("testValue3");
+
+ try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+ // Add initial data
+ t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)
+ .addColumn(FAMILY, QUALIFIER2, VALUE3));
+
+ // Execute MultiRowMutation with conditions
+ Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+ Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+ Delete delete = new Delete(ROW2);
+ MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+ MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+ mrmBuilder.addMutationRequest(m1);
+ mrmBuilder.addMutationRequest(m2);
+ mrmBuilder.addMutationRequest(m3);
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList(
+ new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2),
+ new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE3)), null));
+
+ CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+ MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationService.newBlockingStub(channel);
+ service.mutateRows(null, mrmBuilder.build());
+
+ // Assert
+ Result r = t.get(new Get(ROW));
+ assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW1));
+ assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+ r = t.get(new Get(ROW2));
+ assertTrue(r.isEmpty());
+ }
+ }
+
+ @Test
+ public void testMultiRowMutationWithFilterConditionWhenConditionNotMatch() throws Exception {
+ final TableName tableName = name.getTableName();
+ final byte [] ROW1 = Bytes.toBytes("testRow1");
+ final byte [] ROW2 = Bytes.toBytes("testRow2");
+ final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2");
+ final byte [] VALUE1 = Bytes.toBytes("testValue1");
+ final byte [] VALUE2 = Bytes.toBytes("testValue2");
+ final byte [] VALUE3 = Bytes.toBytes("testValue3");
+
+ try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+ // Add initial data
+ t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)
+ .addColumn(FAMILY, QUALIFIER2, VALUE3));
+
+ // Execute MultiRowMutation with conditions
+ Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+ Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+ Delete delete = new Delete(ROW2);
+ MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+ MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+ mrmBuilder.addMutationRequest(m1);
+ mrmBuilder.addMutationRequest(m2);
+ mrmBuilder.addMutationRequest(m3);
+ mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList(
+ new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2),
+ new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE2)), null));
+
+ CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+ MultiRowMutationService.BlockingInterface service =
+ MultiRowMutationService.newBlockingStub(channel);
+ service.mutateRows(null, mrmBuilder.build());
+
+ // Assert
+ Result r = t.get(new Get(ROW));
+ assertTrue(r.isEmpty());
+
+ r = t.get(new Get(ROW1));
+ assertTrue(r.isEmpty());
+
+ r = t.get(new Get(ROW2));
+ assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index c23bc7d..7816110 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -237,8 +237,8 @@ public class TestMalformedCellFromClient {
builder.setAtomic(true);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
- ClientProtos.Condition condition = RequestConverter
- .buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null);
+ ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null,
+ CompareOperator.EQUAL, new byte[10], null, null);
for (Mutation mutation : rm.getMutations()) {
ClientProtos.MutationProto.MutationType mutateType = null;
if (mutation instanceof Put) {