You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2021/03/21 14:43:53 UTC
[hbase] branch branch-2.4 updated: HBASE-25678 Support nonce
operations for Increment/Append in RowMutations and CheckAndMutate
This is an automated email from the ASF dual-hosted git repository.
brfrn169 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 4cf28c4 HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate
4cf28c4 is described below
commit 4cf28c43f589f8be0daa3f9b9846642b72c1b8ec
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Sun Mar 21 22:47:53 2021 +0900
HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate
Signed-off-by: stack <st...@apache.org>
---
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 22 +-
.../apache/hadoop/hbase/client/AsyncProcess.java | 25 +-
.../org/apache/hadoop/hbase/client/HTable.java | 36 +-
.../hadoop/hbase/client/RawAsyncTableImpl.java | 40 +--
.../hbase/shaded/protobuf/RequestConverter.java | 138 +++++---
.../hadoop/hbase/coprocessor/RegionObserver.java | 2 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 59 ++--
.../hadoop/hbase/regionserver/RSRpcServices.java | 54 +--
.../hbase/client/TestAsyncTableNoncedRetry.java | 268 +++++++++++++--
.../hadoop/hbase/client/TestHTableNoncedRetry.java | 365 +++++++++++++++++++++
10 files changed, 851 insertions(+), 158 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 7e05b05..7af385d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -172,7 +172,7 @@ class AsyncBatchRpcRetryingCaller<T> {
} else {
action = new Action(rawAction, i);
}
- if (rawAction instanceof Append || rawAction instanceof Increment) {
+ if (hasIncrementOrAppend(rawAction)) {
action.setNonce(conn.getNonceGenerator().newNonce());
}
this.actions.add(action);
@@ -184,6 +184,26 @@ class AsyncBatchRpcRetryingCaller<T> {
this.startNs = System.nanoTime();
}
+ private static boolean hasIncrementOrAppend(Row action) {
+ if (action instanceof Append || action instanceof Increment) {
+ return true;
+ } else if (action instanceof RowMutations) {
+ return hasIncrementOrAppend((RowMutations) action);
+ } else if (action instanceof CheckAndMutate) {
+ return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
+ }
+ return false;
+ }
+
+ private static boolean hasIncrementOrAppend(RowMutations mutations) {
+ for (Mutation mutation : mutations.getMutations()) {
+ if (mutation instanceof Append || mutation instanceof Increment) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 8cd046f..6071cb6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -398,8 +398,29 @@ class AsyncProcess {
}
private void setNonce(NonceGenerator ng, Row r, Action action) {
- if (!(r instanceof Append) && !(r instanceof Increment)) return;
- action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
+ if (hasIncrementOrAppend(r)) {
+ action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
+ }
+ }
+
+ private static boolean hasIncrementOrAppend(Row action) {
+ if (action instanceof Append || action instanceof Increment) {
+ return true;
+ } else if (action instanceof RowMutations) {
+ return hasIncrementOrAppend((RowMutations) action);
+ } else if (action instanceof CheckAndMutate) {
+ return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
+ }
+ return false;
+ }
+
+ private static boolean hasIncrementOrAppend(RowMutations mutations) {
+ for (Mutation mutation : mutations.getMutations()) {
+ if (mutation instanceof Append || mutation instanceof Increment) {
+ return true;
+ }
+ }
+ return false;
}
private int checkTimeout(String name, int timeout) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a219fed..a04fd26 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -555,17 +554,16 @@ public class HTable implements Table {
@Override
public Result mutateRow(final RowMutations rm) throws IOException {
+ long nonceGroup = getNonceGroup();
+ long nonce = getNonce();
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeoutMs,
new RetryingTimeTracker().start(), rm.getMaxPriority()) {
@Override
protected MultiResponse rpcCall() throws Exception {
- RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
- getLocation().getRegionInfo().getRegionName(), rm);
- regionMutationBuilder.setAtomic(true);
- MultiRequest request =
- MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+ MultiRequest request = RequestConverter.buildMultiRequest(
+ getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce);
ClientProtos.MultiResponse response = doMulti(request);
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
if (res.hasException()) {
@@ -597,6 +595,14 @@ public class HTable implements Table {
return (Result) results[0];
}
+ private long getNonceGroup() {
+ return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup();
+ }
+
+ private long getNonce() {
+ return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce();
+ }
+
@Override
public Result append(final Append append) throws IOException {
checkHasFamilies(append);
@@ -606,7 +612,8 @@ public class HTable implements Table {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
+ getLocation().getRegionInfo().getRegionName(), append, super.getNonceGroup(),
+ super.getNonce());
MutateResponse response = doMutate(request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
@@ -625,7 +632,8 @@ public class HTable implements Table {
@Override
protected Result rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
+ getLocation().getRegionInfo().getRegionName(), increment, super.getNonceGroup(),
+ super.getNonce());
MutateResponse response = doMutate(request);
// Should this check for null like append does?
return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
@@ -664,7 +672,7 @@ public class HTable implements Table {
protected Long rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildIncrementRequest(
getLocation().getRegionInfo().getRegionName(), row, family,
- qualifier, amount, durability, getNonceGroup(), getNonce());
+ qualifier, amount, durability, super.getNonceGroup(), super.getNonce());
MutateResponse response = doMutate(request);
Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
@@ -737,6 +745,8 @@ public class HTable implements Table {
private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final RowMutations rm) throws IOException {
+ long nonceGroup = getNonceGroup();
+ long nonce = getNonce();
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
@@ -744,8 +754,8 @@ public class HTable implements Table {
@Override
protected MultiResponse rpcCall() throws Exception {
MultiRequest request = RequestConverter
- .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
- qualifier, op, value, filter, timeRange, rm);
+ .buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row, family,
+ qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce);
ClientProtos.MultiResponse response = doMulti(request);
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
if (res.hasException()) {
@@ -822,6 +832,8 @@ public class HTable implements Table {
private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Mutation mutation) throws IOException {
+ long nonceGroup = getNonceGroup();
+ long nonce = getNonce();
ClientServiceCallable<CheckAndMutateResult> callable =
new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), mutation.getPriority()) {
@@ -829,7 +841,7 @@ public class HTable implements Table {
protected CheckAndMutateResult rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
- filter, timeRange, mutation);
+ filter, timeRange, mutation, nonceGroup, nonce);
MutateResponse response = doMutate(request);
if (response.hasResult()) {
return new CheckAndMutateResult(response.getProcessed(),
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 1222d83..bed896e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
/**
* The implementation of RawAsyncTable.
@@ -362,7 +361,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
- null, timeRange, p),
+ null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@@ -374,7 +373,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
- null, timeRange, d),
+ null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@@ -387,8 +386,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
- (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
- null, timeRange, rm), CheckAndMutateResult::isSuccess))
+ (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
+ null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ CheckAndMutateResult::isSuccess))
.call();
}
}
@@ -425,7 +425,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
- filter, timeRange, p),
+ filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@@ -436,7 +436,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
- filter, timeRange, d),
+ filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call();
}
@@ -448,8 +448,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
- (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
- filter, timeRange, rm), CheckAndMutateResult::isSuccess))
+ (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null,
+ filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+ CheckAndMutateResult::isSuccess))
.call();
}
}
@@ -468,6 +469,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
}
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
mutation.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
@@ -475,21 +478,23 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
(rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
- checkAndMutate.getTimeRange(), m),
+ checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
rowMutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) ->
RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(
controller, loc, stub, rowMutations,
- (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
+ (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
- checkAndMutate.getTimeRange(), rm),
+ checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
resp -> resp))
.call();
} else {
@@ -554,16 +559,13 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override
public CompletableFuture<Result> mutateRow(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
+ long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+ long nonce = conn.getNonceGenerator().newNonce();
return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
writeRpcTimeoutNs).action((controller, loc, stub) ->
this.<Result, Result> mutateRow(controller, loc, stub, mutations,
- (rn, rm) -> {
- RegionAction.Builder regionMutationBuilder = RequestConverter
- .buildRegionAction(rn, rm);
- regionMutationBuilder.setAtomic(true);
- return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
- .build();
- }, resp -> resp))
+ (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
+ resp -> resp))
.call();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 54b55cc..21644f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -247,39 +247,78 @@ public final class RequestConverter {
*/
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
- final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
- return MutateRequest.newBuilder()
- .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
- .setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
+ final Filter filter, final TimeRange timeRange, final Mutation mutation, long nonceGroup,
+ long nonce) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ if (mutation instanceof Increment || mutation instanceof Append) {
+ builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation, nonce))
+ .setNonceGroup(nonceGroup);
+ } else {
+ builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
+ }
+ return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
/**
- * Create a protocol buffer MutateRequest for conditioned row mutations
+ * Create a protocol buffer MultiRequest for conditioned row mutations
*
- * @return a mutate request
+ * @return a multi request
* @throws IOException
*/
- public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
+ public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
- final RowMutations rowMutations) throws IOException {
+ final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
+ return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
+ value, filter, timeRange), nonceGroup, nonce);
+ }
+
+ /**
+ * Create a protocol buffer MultiRequest for row mutations
+ *
+ * @return a multi request
+ */
+ public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
+ final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
+ return buildMultiRequest(regionName, rowMutations, null, nonceGroup, nonce);
+ }
+
+ private static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
+ final RowMutations rowMutations, final Condition condition, long nonceGroup, long nonce)
+ throws IOException {
RegionAction.Builder builder =
- getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
+ getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
builder.setAtomic(true);
+
+ boolean hasNonce = false;
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
- MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
- mutationBuilder);
+ MutationProto mp;
+ if (mutation instanceof Increment || mutation instanceof Append) {
+ mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder, nonce);
+ hasNonce = true;
+ } else {
+ mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder);
+ }
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
- return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
- buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build();
+
+ if (condition != null) {
+ builder.setCondition(condition);
+ }
+
+ MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
+ if (hasNonce) {
+ multiRequestBuilder.setNonceGroup(nonceGroup);
+ }
+
+ return multiRequestBuilder.addRegionAction(builder.build()).build();
}
/**
@@ -362,33 +401,6 @@ public final class RequestConverter {
return builder.build();
}
- /**
- * Create a protocol buffer MultiRequest for row mutations.
- * Does not propagate Action absolute position. Does not set atomic action on the created
- * RegionAtomic. Caller should do that if wanted.
- * @param regionName
- * @param rowMutations
- * @return a data-laden RegionMutation.Builder
- * @throws IOException
- */
- public static RegionAction.Builder buildRegionAction(final byte [] regionName,
- final RowMutations rowMutations)
- throws IOException {
- RegionAction.Builder builder =
- getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
- ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
- MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
- for (Mutation mutation: rowMutations.getMutations()) {
- mutationBuilder.clear();
- MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
- mutationBuilder);
- actionBuilder.clear();
- actionBuilder.setMutation(mp);
- builder.addAction(actionBuilder.build());
- }
- return builder;
- }
-
public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
@@ -785,9 +797,6 @@ public final class RequestConverter {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
}
- if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
- multiRequestBuilder.setNonceGroup(nonceGroup);
- }
if (builder.getActionCount() > 0) {
multiRequestBuilder.addRegionAction(builder.build());
}
@@ -801,8 +810,11 @@ public final class RequestConverter {
builder.clear();
getRegionActionBuilderWithRegion(builder, regionName);
- buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder,
- mutationBuilder);
+ boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) action.getAction(),
+ cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
+ if (hasIncrementOrAppend) {
+ hasNonce = true;
+ }
builder.setAtomic(true);
multiRequestBuilder.addRegionAction(builder.build());
@@ -836,16 +848,21 @@ public final class RequestConverter {
} else if (cam.getAction() instanceof Increment) {
actionBuilder.clear();
mutationBuilder.clear();
- buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+ buildNoDataRegionAction((Increment) cam.getAction(), cells, action.getNonce(), builder,
actionBuilder, mutationBuilder);
+ hasNonce = true;
} else if (cam.getAction() instanceof Append) {
actionBuilder.clear();
mutationBuilder.clear();
- buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+ buildNoDataRegionAction((Append) cam.getAction(), cells, action.getNonce(), builder,
actionBuilder, mutationBuilder);
+ hasNonce = true;
} else if (cam.getAction() instanceof RowMutations) {
- buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
- mutationBuilder);
+ boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) cam.getAction(),
+ cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
+ if (hasIncrementOrAppend) {
+ hasNonce = true;
+ }
builder.setAtomic(true);
} else {
throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
@@ -858,6 +875,10 @@ public final class RequestConverter {
// in the overall multiRequest.
indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
}
+
+ if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
+ multiRequestBuilder.setNonceGroup(nonceGroup);
+ }
}
private static void buildNoDataRegionAction(final Put put, final List<CellScannable> cells,
@@ -907,18 +928,29 @@ public final class RequestConverter {
MutationType.APPEND, append, mutationBuilder, nonce)));
}
- private static void buildNoDataRegionAction(final RowMutations rowMutations,
- final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
+ /**
+ * @return whether or not the rowMutations has a Increment or Append
+ */
+ private static boolean buildNoDataRegionAction(final RowMutations rowMutations,
+ final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
throws IOException {
+ boolean ret = false;
for (Mutation mutation: rowMutations.getMutations()) {
mutationBuilder.clear();
- MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
- mutationBuilder);
+ MutationProto mp;
+ if (mutation instanceof Increment || mutation instanceof Append) {
+ mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder,
+ nonce);
+ ret = true;
+ } else {
+ mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder);
+ }
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
+ return ret;
}
private static MutationType getMutationType(Mutation mutation) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index b1423ed..35415a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1551,7 +1551,7 @@ public interface RegionObserver {
List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
for (Pair<Cell, Cell> pair : cellPairs) {
resultPairs.add(new Pair<>(pair.getFirst(),
- postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
+ postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, pair.getFirst(),
pair.getSecond())));
}
return resultPairs;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0f3649b..4f15442 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3289,8 +3289,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected final Map<byte[], List<Cell>>[] familyCellMaps;
// For Increment/Append operations
protected final Result[] results;
- // For nonce operations
- protected final boolean[] canProceed;
protected final HRegion region;
protected int nextIndexToProcess = 0;
@@ -3306,7 +3304,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.walEditsFromCoprocessors = new WALEdit[operations.length];
familyCellMaps = new Map[operations.length];
this.results = new Result[operations.length];
- this.canProceed = new boolean[operations.length];
this.region = region;
observedExceptions = new ObservedExceptionsInBatch();
@@ -3727,8 +3724,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* of the logic is same.
*/
static class MutationBatchOperation extends BatchOperation<Mutation> {
+ // For nonce operations
private long nonceGroup;
private long nonce;
+ protected boolean canProceed;
+
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
long nonceGroup, long nonce) {
super(region, operations);
@@ -3840,6 +3840,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
+ // For nonce operations
+ canProceed = startNonceOperation();
+
visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
Mutation mutation = getMutation(index);
if (mutation instanceof Put) {
@@ -3858,8 +3861,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// For nonce operations
- canProceed[index] = startNonceOperation(nonceGroup, nonce);
- if (!canProceed[index]) {
+ if (!canProceed) {
Result result;
if (returnResults) {
// convert duplicate increment/append to get
@@ -3921,11 +3923,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Starts the nonce operation for a mutation, if needed.
- * @param nonceGroup Nonce group from the request.
- * @param nonce Nonce.
* @return whether to proceed this mutation.
*/
- private boolean startNonceOperation(long nonceGroup, long nonce) throws IOException {
+ private boolean startNonceOperation() throws IOException {
if (region.rsServices == null || region.rsServices.getNonceManager() == null
|| nonce == HConstants.NO_NONCE) {
return true;
@@ -3942,11 +3942,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Ends nonce operation for a mutation, if needed.
- * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
- * @param nonce Nonce.
* @param success Whether the operation for this nonce has succeeded.
*/
- private void endNonceOperation(long nonceGroup, long nonce, boolean success) {
+ private void endNonceOperation(boolean success) {
if (region.rsServices != null && region.rsServices.getNonceManager() != null
&& nonce != HConstants.NO_NONCE) {
region.rsServices.getNonceManager().endOperation(nonceGroup, nonce, success);
@@ -4215,13 +4213,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// For nonce operations
- visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
- if (canProceed[i]) {
- endNonceOperation(nonceGroup, nonce,
- retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS);
- }
- return true;
- });
+ if (canProceed && nonce != HConstants.NO_NONCE) {
+ boolean[] areAllIncrementsAndAppendsSuccessful = new boolean[]{true};
+ visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
+ Mutation mutation = getMutation(i);
+ if (mutation instanceof Increment || mutation instanceof Append) {
+ if (retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
+ areAllIncrementsAndAppendsSuccessful[0] = false;
+ return false;
+ }
+ }
+ return true;
+ });
+ endNonceOperation(areAllIncrementsAndAppendsSuccessful[0]);
+ }
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
@@ -4478,7 +4483,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
- long nonce) throws IOException {
+ long nonce) throws IOException {
// As it stands, this is used for 3 things
// * batchMutate with single mutation - put/delete/increment/append, separate or from
// checkAndMutate.
@@ -4770,6 +4775,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+ return checkAndMutate(checkAndMutate, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup,
+ long nonce) throws IOException {
byte[] row = checkAndMutate.getRow();
Filter filter = null;
byte[] family = null;
@@ -4881,9 +4891,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// All edits for the given row (across all column families) must happen atomically.
Result r;
if (mutation != null) {
- r = doBatchMutate(mutation, true).getResult();
+ r = doBatchMutate(mutation, true, nonceGroup, nonce).getResult();
} else {
- r = mutateRow(rowMutations);
+ r = mutateRow(rowMutations, nonceGroup, nonce);
}
this.checkAndMutateChecksPassed.increment();
return new CheckAndMutateResult(true, r);
@@ -8244,9 +8254,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public Result mutateRow(RowMutations rm) throws IOException {
+ return mutateRow(rm, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ public Result mutateRow(RowMutations rm, long nonceGroup, long nonce) throws IOException {
final List<Mutation> m = rm.getMutations();
- OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true,
- HConstants.NO_NONCE, HConstants.NO_NONCE);
+ OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true, nonceGroup, nonce);
List<Result> results = new ArrayList<>();
for (OperationStatus status : statuses) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 288ba76..f7249d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -599,43 +599,47 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
- CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
- throws IOException {
+ CellScanner cellScanner, Condition condition, long nonceGroup,
+ ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
List<Mutation> mutations = new ArrayList<>();
+ long nonce = HConstants.NO_NONCE;
for (ClientProtos.Action action: actions) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
}
- MutationType type = action.getMutation().getMutateType();
+ MutationProto mutation = action.getMutation();
+ MutationType type = mutation.getMutateType();
switch (type) {
case PUT:
- Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+ Put put = ProtobufUtil.toPut(mutation, cellScanner);
++countOfCompleteMutation;
checkCellSizeLimit(region, put);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
mutations.add(put);
break;
case DELETE:
- Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
+ Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
++countOfCompleteMutation;
spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
mutations.add(del);
break;
case INCREMENT:
- Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
+ Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
+ nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
++countOfCompleteMutation;
checkCellSizeLimit(region, increment);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
mutations.add(increment);
break;
case APPEND:
- Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner);
+ Append append = ProtobufUtil.toAppend(mutation, cellScanner);
+ nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
++countOfCompleteMutation;
checkCellSizeLimit(region, append);
spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
@@ -655,7 +659,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
}
if (result == null) {
- result = region.checkAndMutate(checkAndMutate);
+ result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
}
@@ -910,21 +914,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
- final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
+ final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
// Just throw the exception. The exception will be caught and then added to region-level
// exception for RegionAction. Leaving the null to action result is ok since the null
// result is viewed as failure by hbase client. And the region-lever exception will be used
// to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
// AsyncBatchRpcRetryingCaller#onComplete for more details.
- doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
+ doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
}
private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
final OperationQuota quota, final List<ClientProtos.Action> mutations,
final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
try {
- doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
+ doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
+ spaceQuotaEnforcement, false);
} catch (IOException e) {
// Set the exception for each action. The mutations in same RegionAction are group to
// different batch and then be processed individually. Hence, we don't set the region-level
@@ -943,9 +948,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param mutations
*/
private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
- final OperationQuota quota, final List<ClientProtos.Action> mutations,
- final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
- throws IOException {
+ final OperationQuota quota, final List<ClientProtos.Action> mutations,
+ final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement,
+ boolean atomic) throws IOException {
Mutation[] mArray = new Mutation[mutations.size()];
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
@@ -959,6 +964,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
int i = 0;
+ long nonce = HConstants.NO_NONCE;
for (ClientProtos.Action action: mutations) {
if (action.hasGet()) {
throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
@@ -979,10 +985,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
case INCREMENT:
mutation = ProtobufUtil.toIncrement(m, cells);
+ nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
break;
case APPEND:
mutation = ProtobufUtil.toAppend(m, cells);
+ nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
break;
default:
@@ -1007,8 +1015,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
}
- OperationStatus[] codes = region.batchMutate(mArray, atomic, HConstants.NO_NONCE,
- HConstants.NO_NONCE);
+ OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
// When atomic is true, it indicates that the mutateRow API or the batch API with
// RowMutations is called. In this case, we need to merge the results of the
@@ -2760,7 +2767,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
- cellScanner, request.getCondition(), spaceQuotaEnforcement);
+ cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
responseBuilder.setProcessed(result.isSuccess());
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
@@ -2817,7 +2824,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (regionAction.getActionCount() == 1) {
CheckAndMutateResult result = checkAndMutate(region, quota,
regionAction.getAction(0).getMutation(), cellScanner,
- regionAction.getCondition(), spaceQuotaEnforcement);
+ regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
resultOrExceptionOrBuilder.setIndex(0);
if (result.getResult() != null) {
@@ -2826,7 +2833,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
} else {
CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
- cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
+ cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(result.isSuccess());
for (int i = 0; i < regionAction.getActionCount(); i++) {
if (i == 0 && result.getResult() != null) {
@@ -2852,7 +2859,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
try {
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
- cellScanner, spaceQuotaEnforcement);
+ cellScanner, nonceGroup, spaceQuotaEnforcement);
regionActionResultBuilder.setProcessed(true);
// We no longer use MultiResponse#processed. Instead, we use
// RegionActionResult#processed. This is for backward compatibility for old clients.
@@ -2965,7 +2972,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (request.hasCondition()) {
CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
- request.getCondition(), spaceQuotaEnforcement);
+ request.getCondition(), nonceGroup, spaceQuotaEnforcement);
builder.setProcessed(result.isSuccess());
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
@@ -3049,11 +3056,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
- MutationProto mutation, CellScanner cellScanner, Condition condition,
+ MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
ActivePolicyEnforcement spaceQuota) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
cellScanner);
+ long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
quota.addMutation((Mutation) checkAndMutate.getAction());
@@ -3063,7 +3071,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
}
if (result == null) {
- result = region.checkAndMutate(checkAndMutate);
+ result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 10b358f..833ac84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -56,13 +59,17 @@ public class TestAsyncTableNoncedRetry {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static TableName TABLE_NAME = TableName.valueOf("async");
+ private static final TableName TABLE_NAME = TableName.valueOf("async");
- private static byte[] FAMILY = Bytes.toBytes("cf");
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
- private static byte[] QUALIFIER = Bytes.toBytes("cq");
+ private static final byte[] QUALIFIER = Bytes.toBytes("cq");
- private static byte[] VALUE = Bytes.toBytes("value");
+ private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
+
+ private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
private static AsyncConnection ASYNC_CONN;
@@ -71,9 +78,14 @@ public class TestAsyncTableNoncedRetry {
private byte[] row;
- private static AtomicInteger CALLED = new AtomicInteger();
+ private static final AtomicInteger CALLED = new AtomicInteger();
+
+ private static final long SLEEP_TIME = 2000;
+
+ private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
- private static long SLEEP_TIME = 2000;
+ // The number of miniBatchOperations that are executed in a RegionServer
+ private static int miniBatchOperationCount;
public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
@@ -83,21 +95,12 @@ public class TestAsyncTableNoncedRetry {
}
@Override
- public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
- Result result) throws IOException {
- if (CALLED.getAndIncrement() == 0) {
- Threads.sleepWithoutInterrupt(SLEEP_TIME);
- }
- return RegionObserver.super.postAppend(c, append, result);
- }
-
- @Override
- public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c,
- Increment increment, Result result) throws IOException {
- if (CALLED.getAndIncrement() == 0) {
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ // We sleep when the last of the miniBatchOperations is executed
+ if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
Threads.sleepWithoutInterrupt(SLEEP_TIME);
}
- return RegionObserver.super.postIncrement(c, increment, result);
}
}
@@ -128,8 +131,11 @@ public class TestAsyncTableNoncedRetry {
public void testAppend() throws InterruptedException, ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
- .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
@@ -140,9 +146,12 @@ public class TestAsyncTableNoncedRetry {
ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
- .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
.setReturnResults(false)).get();
+
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isEmpty());
@@ -152,10 +161,14 @@ public class TestAsyncTableNoncedRetry {
public void testIncrement() throws InterruptedException, ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
- .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
- assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get();
+
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
+ assertEquals(1L, result);
}
@Test
@@ -163,11 +176,218 @@ public class TestAsyncTableNoncedRetry {
ExecutionException {
assertEquals(0, CALLED.get());
AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
- .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
.setReturnResults(false)).get();
+
// make sure we called twice and the result is still correct
assertEquals(2, CALLED.get());
assertTrue(result.isEmpty());
}
+
+ @Test
+ public void testIncrementInRowMutations()
+ throws InterruptedException, ExecutionException, IOException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ Result result = table.mutateRow(new RowMutations(row)
+ .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+ }
+
+ @Test
+ public void testAppendInRowMutations()
+ throws InterruptedException, ExecutionException, IOException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ Result result = table.mutateRow(new RowMutations(row)
+ .add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+ }
+
+ @Test
+ public void testIncrementAndAppendInRowMutations()
+ throws InterruptedException, ExecutionException, IOException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ Result result = table.mutateRow(new RowMutations(row)
+ .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
+ }
+
+ @Test
+ public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ }
+
+ @Test
+ public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+ }
+
+ @Test
+ public void testIncrementInRowMutationsInCheckAndMutate() throws InterruptedException,
+ ExecutionException, IOException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ }
+
+ @Test
+ public void testAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
+ ExecutionException, IOException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+ }
+
+ @Test
+ public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
+ ExecutionException, IOException {
+ assertEquals(0, CALLED.get());
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))).get();
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+ }
+
+ @Test
+ public void testBatch() throws InterruptedException,
+ ExecutionException, IOException {
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+ byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+ byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+ byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
+ byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
+
+ assertEquals(0, CALLED.get());
+
+ AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+ .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+ miniBatchOperationCount = 6;
+ List<Object> results = table.batchAll(Arrays.asList(
+ new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
+ new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
+ new RowMutations(row3)
+ .add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
+ CheckAndMutate.newBuilder(row4)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
+ CheckAndMutate.newBuilder(row5)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
+ CheckAndMutate.newBuilder(row6)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE))))).get();
+
+ // make sure we called twice and the result is still correct
+
+ // should be called 12 times as 6 miniBatchOperations are called twice
+ assertEquals(12, CALLED.get());
+
+ assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER));
+
+ assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER)));
+
+ assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2));
+
+ CheckAndMutateResult result;
+
+ result = (CheckAndMutateResult) results.get(3);
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+
+ result = (CheckAndMutateResult) results.get(4);
+ assertTrue(result.isSuccess());
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+
+ result = (CheckAndMutateResult) results.get(5);
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java
new file mode 100644
index 0000000..51ecba9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestHTableNoncedRetry {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHTableNoncedRetry.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static final byte[] QUALIFIER = Bytes.toBytes("cq");
+
+ private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
+
+ private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static Connection CONN;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private byte[] row;
+
+ private Table table;
+
+ private static final AtomicInteger CALLED = new AtomicInteger();
+
+ private static final int SLEEP_TIME = 2000;
+
+ private static final int RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
+
+ // The number of miniBatchOperations that are executed in a RegionServer
+ private static int miniBatchOperationCount;
+
+ public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ // We sleep when the last of the miniBatchOperations is executed
+ if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
+ Threads.sleepWithoutInterrupt(SLEEP_TIME);
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.getAdmin()
+ .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+ .setCoprocessor(SleepOnceCP.class.getName()).build());
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ Closeables.close(CONN, true);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
+ CALLED.set(0);
+
+ table = CONN.getTable(TABLE_NAME);
+ table.setRpcTimeout(RPC_TIMEOUT);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ table.close();
+ }
+
+ @Test
+ public void testAppend() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+ }
+
+ @Test
+ public void testAppendWhenReturnResultsEqualsFalse() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
+ .setReturnResults(false));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testIncrement() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L);
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertEquals(1L, result);
+ }
+
+ @Test
+ public void testIncrementWhenReturnResultsEqualsFalse() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
+ .setReturnResults(false));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testIncrementInRowMutations() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ Result result = table.mutateRow(new RowMutations(row)
+ .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+ }
+
+ @Test
+ public void testAppendInRowMutations() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ Result result = table.mutateRow(new RowMutations(row)
+ .add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+ }
+
+ @Test
+ public void testIncrementAndAppendInRowMutations() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ Result result = table.mutateRow(new RowMutations(row)
+ .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
+ }
+
+ @Test
+ public void testIncrementInCheckAndMutate() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ }
+
+ @Test
+ public void testAppendInCheckAndMutate() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+ }
+
+ @Test
+ public void testIncrementInRowMutationsInCheckAndMutate() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ }
+
+ @Test
+ public void testAppendInRowMutationsInCheckAndMutate() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+ .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+ }
+
+ @Test
+ public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws IOException {
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 1;
+ CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))));
+
+ // make sure we called twice and the result is still correct
+ assertEquals(2, CALLED.get());
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+ }
+
+ @Test
+ public void testBatch() throws IOException, InterruptedException {
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+ byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+ byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+ byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
+ byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
+
+ assertEquals(0, CALLED.get());
+
+ miniBatchOperationCount = 6;
+ Object[] results = new Object[6];
+ table.batch(Arrays.asList(
+ new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
+ new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
+ new RowMutations(row3)
+ .add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
+ CheckAndMutate.newBuilder(row4)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
+ CheckAndMutate.newBuilder(row5)
+ .ifNotExists(FAMILY, QUALIFIER2)
+ .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
+ CheckAndMutate.newBuilder(row6)
+ .ifNotExists(FAMILY, QUALIFIER3)
+ .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
+ .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE)))), results);
+
+ // make sure we called twice and the result is still correct
+
+ // should be called 12 times as 6 miniBatchOperations are called twice
+ assertEquals(12, CALLED.get());
+
+ assertArrayEquals(VALUE, ((Result) results[0]).getValue(FAMILY, QUALIFIER));
+
+ assertEquals(1L, Bytes.toLong(((Result) results[1]).getValue(FAMILY, QUALIFIER)));
+
+ assertEquals(1L, Bytes.toLong(((Result) results[2]).getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, ((Result) results[2]).getValue(FAMILY, QUALIFIER2));
+
+ CheckAndMutateResult result;
+
+ result = (CheckAndMutateResult) results[3];
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+
+ result = (CheckAndMutateResult) results[4];
+ assertTrue(result.isSuccess());
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+
+ result = (CheckAndMutateResult) results[5];
+ assertTrue(result.isSuccess());
+ assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+ assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+ }
+}