You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2021/03/21 13:48:26 UTC

[hbase] branch branch-2 updated: HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate (#3073)

This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2b5c2c3  HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate (#3073)
2b5c2c3 is described below

commit 2b5c2c36d99e8ecb7e7a345d83b1551999632f4a
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Sun Mar 21 22:47:53 2021 +0900

    HBASE-25678 Support nonce operations for Increment/Append in RowMutations and CheckAndMutate (#3073)
    
    Signed-off-by: stack <st...@apache.org>
---
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |  22 +-
 .../apache/hadoop/hbase/client/AsyncProcess.java   |  25 +-
 .../org/apache/hadoop/hbase/client/HTable.java     |  36 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  40 +--
 .../hbase/shaded/protobuf/RequestConverter.java    | 138 +++++---
 .../hadoop/hbase/coprocessor/RegionObserver.java   |   2 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  59 ++--
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  53 +--
 .../hbase/client/TestAsyncTableNoncedRetry.java    | 268 +++++++++++++--
 .../hadoop/hbase/client/TestHTableNoncedRetry.java | 365 +++++++++++++++++++++
 10 files changed, 850 insertions(+), 158 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 7e05b05..7af385d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -172,7 +172,7 @@ class AsyncBatchRpcRetryingCaller<T> {
       } else {
         action = new Action(rawAction, i);
       }
-      if (rawAction instanceof Append || rawAction instanceof Increment) {
+      if (hasIncrementOrAppend(rawAction)) {
         action.setNonce(conn.getNonceGenerator().newNonce());
       }
       this.actions.add(action);
@@ -184,6 +184,26 @@ class AsyncBatchRpcRetryingCaller<T> {
     this.startNs = System.nanoTime();
   }
 
+  private static boolean hasIncrementOrAppend(Row action) {
+    if (action instanceof Append || action instanceof Increment) {
+      return true;
+    } else if (action instanceof RowMutations) {
+      return hasIncrementOrAppend((RowMutations) action);
+    } else if (action instanceof CheckAndMutate) {
+      return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
+    }
+    return false;
+  }
+
+  private static boolean hasIncrementOrAppend(RowMutations mutations) {
+    for (Mutation mutation : mutations.getMutations()) {
+      if (mutation instanceof Append || mutation instanceof Increment) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private long remainingTimeNs() {
     return operationTimeoutNs - (System.nanoTime() - startNs);
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 8cd046f..6071cb6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -398,8 +398,29 @@ class AsyncProcess {
   }
 
   private void setNonce(NonceGenerator ng, Row r, Action action) {
-    if (!(r instanceof Append) && !(r instanceof Increment)) return;
-    action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
+    if (hasIncrementOrAppend(r)) {
+      action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
+    }
+  }
+
+  private static boolean hasIncrementOrAppend(Row action) {
+    if (action instanceof Append || action instanceof Increment) {
+      return true;
+    } else if (action instanceof RowMutations) {
+      return hasIncrementOrAppend((RowMutations) action);
+    } else if (action instanceof CheckAndMutate) {
+      return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
+    }
+    return false;
+  }
+
+  private static boolean hasIncrementOrAppend(RowMutations mutations) {
+    for (Mutation mutation : mutations.getMutations()) {
+      if (mutation instanceof Append || mutation instanceof Increment) {
+        return true;
+      }
+    }
+    return false;
   }
 
   private int checkTimeout(String name, int timeout) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a219fed..a04fd26 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -555,17 +554,16 @@ public class HTable implements Table {
 
   @Override
   public Result mutateRow(final RowMutations rm) throws IOException {
+    long nonceGroup = getNonceGroup();
+    long nonce = getNonce();
     CancellableRegionServerCallable<MultiResponse> callable =
       new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
           rpcControllerFactory.newController(), writeRpcTimeoutMs,
           new RetryingTimeTracker().start(), rm.getMaxPriority()) {
       @Override
       protected MultiResponse rpcCall() throws Exception {
-        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
-            getLocation().getRegionInfo().getRegionName(), rm);
-        regionMutationBuilder.setAtomic(true);
-        MultiRequest request =
-            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+        MultiRequest request = RequestConverter.buildMultiRequest(
+          getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce);
         ClientProtos.MultiResponse response = doMulti(request);
         ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
         if (res.hasException()) {
@@ -597,6 +595,14 @@ public class HTable implements Table {
     return (Result) results[0];
   }
 
+  private long getNonceGroup() {
+    return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup();
+  }
+
+  private long getNonce() {
+    return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce();
+  }
+
   @Override
   public Result append(final Append append) throws IOException {
     checkHasFamilies(append);
@@ -606,7 +612,8 @@ public class HTable implements Table {
       @Override
       protected Result rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
+          getLocation().getRegionInfo().getRegionName(), append, super.getNonceGroup(),
+          super.getNonce());
         MutateResponse response = doMutate(request);
         if (!response.hasResult()) return null;
         return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
@@ -625,7 +632,8 @@ public class HTable implements Table {
       @Override
       protected Result rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
+          getLocation().getRegionInfo().getRegionName(), increment, super.getNonceGroup(),
+          super.getNonce());
         MutateResponse response = doMutate(request);
         // Should this check for null like append does?
         return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
@@ -664,7 +672,7 @@ public class HTable implements Table {
       protected Long rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildIncrementRequest(
           getLocation().getRegionInfo().getRegionName(), row, family,
-          qualifier, amount, durability, getNonceGroup(), getNonce());
+          qualifier, amount, durability, super.getNonceGroup(), super.getNonce());
         MutateResponse response = doMutate(request);
         Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
         return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
@@ -737,6 +745,8 @@ public class HTable implements Table {
   private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
     final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
     final TimeRange timeRange, final RowMutations rm) throws IOException {
+    long nonceGroup = getNonceGroup();
+    long nonce = getNonce();
     CancellableRegionServerCallable<MultiResponse> callable =
     new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
     rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
@@ -744,8 +754,8 @@ public class HTable implements Table {
       @Override
       protected MultiResponse rpcCall() throws Exception {
         MultiRequest request = RequestConverter
-          .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
-            qualifier, op, value, filter, timeRange, rm);
+          .buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row, family,
+            qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce);
         ClientProtos.MultiResponse response = doMulti(request);
         ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
         if (res.hasException()) {
@@ -822,6 +832,8 @@ public class HTable implements Table {
   private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
     final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
     final TimeRange timeRange, final Mutation mutation) throws IOException {
+    long nonceGroup = getNonceGroup();
+    long nonce = getNonce();
     ClientServiceCallable<CheckAndMutateResult> callable =
       new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
         this.rpcControllerFactory.newController(), mutation.getPriority()) {
@@ -829,7 +841,7 @@ public class HTable implements Table {
         protected CheckAndMutateResult rpcCall() throws Exception {
           MutateRequest request = RequestConverter.buildMutateRequest(
             getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
-            filter, timeRange, mutation);
+            filter, timeRange, mutation, nonceGroup, nonce);
           MutateResponse response = doMutate(request);
           if (response.hasResult()) {
             return new CheckAndMutateResult(response.getProcessed(),
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 1222d83..bed896e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 
 /**
  * The implementation of RawAsyncTable.
@@ -362,7 +361,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
         .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
           stub, put,
           (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
-            null, timeRange, p),
+            null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
           (c, r) -> r.getProcessed()))
         .call();
     }
@@ -374,7 +373,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
         .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
           loc, stub, delete,
           (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
-            null, timeRange, d),
+            null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
           (c, r) -> r.getProcessed()))
         .call();
     }
@@ -387,8 +386,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
         rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
           loc, stub, mutation,
-          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
-            null, timeRange, rm), CheckAndMutateResult::isSuccess))
+          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
+            null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+          CheckAndMutateResult::isSuccess))
         .call();
     }
   }
@@ -425,7 +425,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
         .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
           stub, put,
           (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
-            filter, timeRange, p),
+            filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
           (c, r) -> r.getProcessed()))
         .call();
     }
@@ -436,7 +436,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
         .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
           loc, stub, delete,
           (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
-            filter, timeRange, d),
+            filter, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
           (c, r) -> r.getProcessed()))
         .call();
     }
@@ -448,8 +448,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
         rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
           loc, stub, mutation,
-          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
-            filter, timeRange, rm), CheckAndMutateResult::isSuccess))
+          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null,
+            filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
+          CheckAndMutateResult::isSuccess))
         .call();
     }
   }
@@ -468,6 +469,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
       if (mutation instanceof Put) {
         validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
       }
+      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+      long nonce = conn.getNonceGenerator().newNonce();
       return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
         mutation.getPriority(), rpcTimeoutNs)
         .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
@@ -475,21 +478,23 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
           (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
             checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
             checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
-            checkAndMutate.getTimeRange(), m),
+            checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
           (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
         .call();
     } else if (checkAndMutate.getAction() instanceof RowMutations) {
       RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
       validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
+      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+      long nonce = conn.getNonceGenerator().newNonce();
       return RawAsyncTableImpl.this.<CheckAndMutateResult> newCaller(checkAndMutate.getRow(),
         rowMutations.getMaxPriority(), rpcTimeoutNs)
         .action((controller, loc, stub) ->
           RawAsyncTableImpl.this.<CheckAndMutateResult, CheckAndMutateResult> mutateRow(
             controller, loc, stub, rowMutations,
-            (rn, rm) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
+            (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
             checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
             checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
-            checkAndMutate.getTimeRange(), rm),
+            checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
             resp -> resp))
         .call();
     } else {
@@ -554,16 +559,13 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   @Override
   public CompletableFuture<Result> mutateRow(RowMutations mutations) {
     validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
+    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+    long nonce = conn.getNonceGenerator().newNonce();
     return this.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(),
       writeRpcTimeoutNs).action((controller, loc, stub) ->
         this.<Result, Result> mutateRow(controller, loc, stub, mutations,
-          (rn, rm) -> {
-            RegionAction.Builder regionMutationBuilder = RequestConverter
-              .buildRegionAction(rn, rm);
-            regionMutationBuilder.setAtomic(true);
-            return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build())
-              .build();
-          }, resp -> resp))
+          (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
+          resp -> resp))
       .call();
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 54b55cc..21644f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -247,39 +247,78 @@ public final class RequestConverter {
    */
   public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
     final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
-    final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
-    return MutateRequest.newBuilder()
-      .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
-      .setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
+    final Filter filter, final TimeRange timeRange, final Mutation mutation, long nonceGroup,
+    long nonce) throws IOException {
+    MutateRequest.Builder builder = MutateRequest.newBuilder();
+    if (mutation instanceof Increment || mutation instanceof Append) {
+      builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation, nonce))
+        .setNonceGroup(nonceGroup);
+    } else {
+      builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
+    }
+    return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
       .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
       .build();
   }
 
   /**
-   * Create a protocol buffer MutateRequest for conditioned row mutations
+   * Create a protocol buffer MultiRequest for conditioned row mutations
    *
-   * @return a mutate request
+   * @return a multi request
    * @throws IOException
    */
-  public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
+  public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
     final byte[] row, final byte[] family, final byte[] qualifier,
     final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
-    final RowMutations rowMutations) throws IOException {
+    final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
+    return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
+      value, filter, timeRange), nonceGroup, nonce);
+  }
+
+  /**
+   * Create a protocol buffer MultiRequest for row mutations
+   *
+   * @return a multi request
+   */
+  public static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
+    final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
+    return buildMultiRequest(regionName, rowMutations, null, nonceGroup, nonce);
+  }
+
+  private static ClientProtos.MultiRequest buildMultiRequest(final byte[] regionName,
+    final RowMutations rowMutations, final Condition condition, long nonceGroup, long nonce)
+    throws IOException {
     RegionAction.Builder builder =
-        getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
+      getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
     builder.setAtomic(true);
+
+    boolean hasNonce = false;
     ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
     MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
     for (Mutation mutation: rowMutations.getMutations()) {
       mutationBuilder.clear();
-      MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
-        mutationBuilder);
+      MutationProto mp;
+      if (mutation instanceof Increment || mutation instanceof Append) {
+        mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder, nonce);
+        hasNonce = true;
+      } else {
+        mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation, mutationBuilder);
+      }
       actionBuilder.clear();
       actionBuilder.setMutation(mp);
       builder.addAction(actionBuilder.build());
     }
-    return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.setCondition(
-      buildCondition(row, family, qualifier, op, value, filter, timeRange)).build()).build();
+
+    if (condition != null) {
+      builder.setCondition(condition);
+    }
+
+    MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
+    if (hasNonce) {
+      multiRequestBuilder.setNonceGroup(nonceGroup);
+    }
+
+    return multiRequestBuilder.addRegionAction(builder.build()).build();
   }
 
   /**
@@ -362,33 +401,6 @@ public final class RequestConverter {
     return builder.build();
   }
 
-  /**
-   * Create a protocol buffer MultiRequest for row mutations.
-   * Does not propagate Action absolute position.  Does not set atomic action on the created
-   * RegionAtomic.  Caller should do that if wanted.
-   * @param regionName
-   * @param rowMutations
-   * @return a data-laden RegionMutation.Builder
-   * @throws IOException
-   */
-  public static RegionAction.Builder buildRegionAction(final byte [] regionName,
-      final RowMutations rowMutations)
-  throws IOException {
-    RegionAction.Builder builder =
-      getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
-    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
-    MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
-    for (Mutation mutation: rowMutations.getMutations()) {
-      mutationBuilder.clear();
-      MutationProto mp = ProtobufUtil.toMutation(getMutationType(mutation), mutation,
-        mutationBuilder);
-      actionBuilder.clear();
-      actionBuilder.setMutation(mp);
-      builder.addAction(actionBuilder.build());
-    }
-    return builder;
-  }
-
   public static RegionAction.Builder getRegionActionBuilderWithRegion(
       final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
     RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
@@ -785,9 +797,6 @@ public final class RequestConverter {
         throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }
     }
-    if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
-      multiRequestBuilder.setNonceGroup(nonceGroup);
-    }
     if (builder.getActionCount() > 0) {
       multiRequestBuilder.addRegionAction(builder.build());
     }
@@ -801,8 +810,11 @@ public final class RequestConverter {
       builder.clear();
       getRegionActionBuilderWithRegion(builder, regionName);
 
-      buildNoDataRegionAction((RowMutations) action.getAction(), cells, builder, actionBuilder,
-        mutationBuilder);
+      boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) action.getAction(),
+        cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
+      if (hasIncrementOrAppend) {
+        hasNonce = true;
+      }
       builder.setAtomic(true);
 
       multiRequestBuilder.addRegionAction(builder.build());
@@ -836,16 +848,21 @@ public final class RequestConverter {
       } else if (cam.getAction() instanceof Increment) {
         actionBuilder.clear();
         mutationBuilder.clear();
-        buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+        buildNoDataRegionAction((Increment) cam.getAction(), cells, action.getNonce(), builder,
           actionBuilder, mutationBuilder);
+        hasNonce = true;
       } else if (cam.getAction() instanceof Append) {
         actionBuilder.clear();
         mutationBuilder.clear();
-        buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+        buildNoDataRegionAction((Append) cam.getAction(), cells, action.getNonce(), builder,
           actionBuilder, mutationBuilder);
+        hasNonce = true;
       } else if (cam.getAction() instanceof RowMutations) {
-        buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
-          mutationBuilder);
+        boolean hasIncrementOrAppend = buildNoDataRegionAction((RowMutations) cam.getAction(),
+          cells, action.getNonce(), builder, actionBuilder, mutationBuilder);
+        if (hasIncrementOrAppend) {
+          hasNonce = true;
+        }
         builder.setAtomic(true);
       } else {
         throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
@@ -858,6 +875,10 @@ public final class RequestConverter {
       // in the overall multiRequest.
       indexMap.put(multiRequestBuilder.getRegionActionCount() - 1, action.getOriginalIndex());
     }
+
+    if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
+      multiRequestBuilder.setNonceGroup(nonceGroup);
+    }
   }
 
   private static void buildNoDataRegionAction(final Put put, final List<CellScannable> cells,
@@ -907,18 +928,29 @@ public final class RequestConverter {
       MutationType.APPEND, append, mutationBuilder, nonce)));
   }
 
-  private static void buildNoDataRegionAction(final RowMutations rowMutations,
-    final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
+  /**
+   * @return whether or not the rowMutations has a Increment or Append
+   */
+  private static boolean buildNoDataRegionAction(final RowMutations rowMutations,
+    final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
     final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
     throws IOException {
+    boolean ret = false;
     for (Mutation mutation: rowMutations.getMutations()) {
       mutationBuilder.clear();
-      MutationProto mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation,
-        mutationBuilder);
+      MutationProto mp;
+      if (mutation instanceof Increment || mutation instanceof Append) {
+        mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder,
+          nonce);
+        ret = true;
+      } else {
+        mp = ProtobufUtil.toMutationNoData(getMutationType(mutation), mutation, mutationBuilder);
+      }
       cells.add(mutation);
       actionBuilder.clear();
       regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
     }
+    return ret;
   }
 
   private static MutationType getMutationType(Mutation mutation) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 2bd7665..a4f3f08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1551,7 +1551,7 @@ public interface RegionObserver {
     List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
     for (Pair<Cell, Cell> pair : cellPairs) {
       resultPairs.add(new Pair<>(pair.getFirst(),
-          postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
+          postMutationBeforeWAL(ctx, MutationType.APPEND, mutation, pair.getFirst(),
               pair.getSecond())));
     }
     return resultPairs;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5bfeff7..9220b2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3259,8 +3259,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     protected final Map<byte[], List<Cell>>[] familyCellMaps;
     // For Increment/Append operations
     protected final Result[] results;
-    // For nonce operations
-    protected final boolean[] canProceed;
 
     protected final HRegion region;
     protected int nextIndexToProcess = 0;
@@ -3276,7 +3274,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.walEditsFromCoprocessors = new WALEdit[operations.length];
       familyCellMaps = new Map[operations.length];
       this.results = new Result[operations.length];
-      this.canProceed = new boolean[operations.length];
 
       this.region = region;
       observedExceptions = new ObservedExceptionsInBatch();
@@ -3698,9 +3695,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   private static class MutationBatchOperation extends BatchOperation<Mutation> {
 
+    // For nonce operations
     private long nonceGroup;
-
     private long nonce;
+    protected boolean canProceed;
 
     public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
       long nonceGroup, long nonce) {
@@ -3813,6 +3811,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     @Override
     public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
         long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
+      // For nonce operations
+      canProceed = startNonceOperation();
+
       visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
         Mutation mutation = getMutation(index);
         if (mutation instanceof Put) {
@@ -3831,8 +3832,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
 
           // For nonce operations
-          canProceed[index] = startNonceOperation(nonceGroup, nonce);
-          if (!canProceed[index]) {
+          if (!canProceed) {
             Result result;
             if (returnResults) {
               // convert duplicate increment/append to get
@@ -3894,11 +3894,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     /**
      * Starts the nonce operation for a mutation, if needed.
-     * @param nonceGroup Nonce group from the request.
-     * @param nonce Nonce.
      * @return whether to proceed this mutation.
      */
-    private boolean startNonceOperation(long nonceGroup, long nonce) throws IOException {
+    private boolean startNonceOperation() throws IOException {
       if (region.rsServices == null || region.rsServices.getNonceManager() == null
         || nonce == HConstants.NO_NONCE) {
         return true;
@@ -3915,11 +3913,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     /**
      * Ends nonce operation for a mutation, if needed.
-     * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
-     * @param nonce Nonce.
      * @param success Whether the operation for this nonce has succeeded.
      */
-    private void endNonceOperation(long nonceGroup, long nonce, boolean success) {
+    private void endNonceOperation(boolean success) {
       if (region.rsServices != null && region.rsServices.getNonceManager() != null
         && nonce != HConstants.NO_NONCE) {
         region.rsServices.getNonceManager().endOperation(nonceGroup, nonce, success);
@@ -4188,13 +4184,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
 
         // For nonce operations
-        visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
-          if (canProceed[i]) {
-            endNonceOperation(nonceGroup, nonce,
-              retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS);
-          }
-          return true;
-        });
+        if (canProceed && nonce != HConstants.NO_NONCE) {
+          boolean[] areAllIncrementsAndAppendsSuccessful = new boolean[]{true};
+          visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
+            Mutation mutation = getMutation(i);
+            if (mutation instanceof Increment || mutation instanceof Append) {
+              if (retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
+                areAllIncrementsAndAppendsSuccessful[0] = false;
+                return false;
+              }
+            }
+            return true;
+          });
+          endNonceOperation(areAllIncrementsAndAppendsSuccessful[0]);
+        }
 
         // See if the column families were consistent through the whole thing.
         // if they were then keep them. If they were not then pass a null.
@@ -4452,7 +4455,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  private OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
+  public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
     long nonce) throws IOException {
     // As it stands, this is used for 3 things
     // * batchMutate with single mutation - put/delete/increment/append, separate or from
@@ -4748,6 +4751,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+    return checkAndMutate(checkAndMutate, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate, long nonceGroup,
+    long nonce) throws IOException {
     byte[] row = checkAndMutate.getRow();
     Filter filter = null;
     byte[] family = null;
@@ -4859,9 +4867,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // All edits for the given row (across all column families) must happen atomically.
           Result r;
           if (mutation != null) {
-            r = mutate(mutation, true).getResult();
+            r = mutate(mutation, true, nonceGroup, nonce).getResult();
           } else {
-            r = mutateRow(rowMutations);
+            r = mutateRow(rowMutations, nonceGroup, nonce);
           }
           this.checkAndMutateChecksPassed.increment();
           return new CheckAndMutateResult(true, r);
@@ -7517,9 +7525,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public Result mutateRow(RowMutations rm) throws IOException {
+    return mutateRow(rm, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  public Result mutateRow(RowMutations rm, long nonceGroup, long nonce) throws IOException {
     final List<Mutation> m = rm.getMutations();
-    OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true,
-      HConstants.NO_NONCE, HConstants.NO_NONCE);
+    OperationStatus[] statuses = batchMutate(m.toArray(new Mutation[0]), true, nonceGroup, nonce);
 
     List<Result> results = new ArrayList<>();
     for (OperationStatus status : statuses) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7362a0e..a5cdccb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -598,43 +598,47 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
-    CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
-    throws IOException {
+    CellScanner cellScanner, Condition condition, long nonceGroup,
+    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
     int countOfCompleteMutation = 0;
     try {
       if (!region.getRegionInfo().isMetaRegion()) {
         regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
       }
       List<Mutation> mutations = new ArrayList<>();
+      long nonce = HConstants.NO_NONCE;
       for (ClientProtos.Action action: actions) {
         if (action.hasGet()) {
           throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
             action.getGet());
         }
-        MutationType type = action.getMutation().getMutateType();
+        MutationProto mutation = action.getMutation();
+        MutationType type = mutation.getMutateType();
         switch (type) {
           case PUT:
-            Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+            Put put = ProtobufUtil.toPut(mutation, cellScanner);
             ++countOfCompleteMutation;
             checkCellSizeLimit(region, put);
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
             mutations.add(put);
             break;
           case DELETE:
-            Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
+            Delete del = ProtobufUtil.toDelete(mutation, cellScanner);
             ++countOfCompleteMutation;
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
             mutations.add(del);
             break;
           case INCREMENT:
-            Increment increment = ProtobufUtil.toIncrement(action.getMutation(), cellScanner);
+            Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
+            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
             ++countOfCompleteMutation;
             checkCellSizeLimit(region, increment);
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(increment);
             mutations.add(increment);
             break;
           case APPEND:
-            Append append = ProtobufUtil.toAppend(action.getMutation(), cellScanner);
+            Append append = ProtobufUtil.toAppend(mutation, cellScanner);
+            nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
             ++countOfCompleteMutation;
             checkCellSizeLimit(region, append);
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(append);
@@ -654,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
         }
         if (result == null) {
-          result = region.checkAndMutate(checkAndMutate);
+          result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
           if (region.getCoprocessorHost() != null) {
             result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
           }
@@ -909,21 +913,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
   private void doAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
     final OperationQuota quota, final List<ClientProtos.Action> mutations,
-    final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement)
+    final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement)
     throws IOException {
     // Just throw the exception. The exception will be caught and then added to region-level
     // exception for RegionAction. Leaving the null to action result is ok since the null
     // result is viewed as failure by hbase client. And the region-lever exception will be used
     // to replaced the null result. see AsyncRequestFutureImpl#receiveMultiAction and
     // AsyncBatchRpcRetryingCaller#onComplete for more details.
-    doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, true);
+    doBatchOp(builder, region, quota, mutations, cells, nonceGroup, spaceQuotaEnforcement, true);
   }
 
   private void doNonAtomicBatchOp(final RegionActionResult.Builder builder, final HRegion region,
     final OperationQuota quota, final List<ClientProtos.Action> mutations,
     final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
     try {
-      doBatchOp(builder, region, quota, mutations, cells, spaceQuotaEnforcement, false);
+      doBatchOp(builder, region, quota, mutations, cells, HConstants.NO_NONCE,
+        spaceQuotaEnforcement, false);
     } catch (IOException e) {
       // Set the exception for each action. The mutations in same RegionAction are group to
       // different batch and then be processed individually. Hence, we don't set the region-level
@@ -942,9 +947,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @param mutations
    */
   private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
-      final OperationQuota quota, final List<ClientProtos.Action> mutations,
-      final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic)
-      throws IOException {
+    final OperationQuota quota, final List<ClientProtos.Action> mutations,
+    final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement,
+    boolean atomic) throws IOException {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
@@ -958,6 +963,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
        */
       Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
       int i = 0;
+      long nonce = HConstants.NO_NONCE;
       for (ClientProtos.Action action: mutations) {
         if (action.hasGet()) {
           throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
@@ -978,10 +984,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
           case INCREMENT:
             mutation = ProtobufUtil.toIncrement(m, cells);
+            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
             break;
 
           case APPEND:
             mutation = ProtobufUtil.toAppend(m, cells);
+            nonce = m.hasNonce() ? m.getNonce() : HConstants.NO_NONCE;
             break;
 
           default:
@@ -1006,7 +1014,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
       }
 
-      OperationStatus[] codes = region.batchMutate(mArray, atomic);
+      OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
 
       // When atomic is true, it indicates that the mutateRow API or the batch API with
       // RowMutations is called. In this case, we need to merge the results of the
@@ -2758,7 +2766,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
       try {
         CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
-          cellScanner, request.getCondition(), spaceQuotaEnforcement);
+          cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
         responseBuilder.setProcessed(result.isSuccess());
         ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
           ClientProtos.ResultOrException.newBuilder();
@@ -2815,7 +2823,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             if (regionAction.getActionCount() == 1) {
               CheckAndMutateResult result = checkAndMutate(region, quota,
                 regionAction.getAction(0).getMutation(), cellScanner,
-                regionAction.getCondition(), spaceQuotaEnforcement);
+                regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
               regionActionResultBuilder.setProcessed(result.isSuccess());
               resultOrExceptionOrBuilder.setIndex(0);
               if (result.getResult() != null) {
@@ -2824,7 +2832,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
             } else {
               CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
-                cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
+                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
               regionActionResultBuilder.setProcessed(result.isSuccess());
               for (int i = 0; i < regionAction.getActionCount(); i++) {
                 if (i == 0 && result.getResult() != null) {
@@ -2850,7 +2858,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
           try {
             doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
-              cellScanner, spaceQuotaEnforcement);
+              cellScanner, nonceGroup, spaceQuotaEnforcement);
             regionActionResultBuilder.setProcessed(true);
             // We no longer use MultiResponse#processed. Instead, we use
             // RegionActionResult#processed. This is for backward compatibility for old clients.
@@ -2963,7 +2971,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
       if (request.hasCondition()) {
         CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
-          request.getCondition(), spaceQuotaEnforcement);
+          request.getCondition(), nonceGroup, spaceQuotaEnforcement);
         builder.setProcessed(result.isSuccess());
         boolean clientCellBlockSupported = isClientCellBlockSupport(context);
         addResult(builder, result.getResult(), controller, clientCellBlockSupported);
@@ -3047,11 +3055,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
-    MutationProto mutation, CellScanner cellScanner, Condition condition,
+    MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup,
     ActivePolicyEnforcement spaceQuota) throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
       cellScanner);
+    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
     checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
     spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
     quota.addMutation((Mutation) checkAndMutate.getAction());
@@ -3061,7 +3070,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
     }
     if (result == null) {
-      result = region.checkAndMutate(checkAndMutate);
+      result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
       if (region.getCoprocessorHost() != null) {
         result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 82cc1a8..62a4499 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -57,13 +60,17 @@ public class TestAsyncTableNoncedRetry {
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-  private static TableName TABLE_NAME = TableName.valueOf("async");
+  private static final TableName TABLE_NAME = TableName.valueOf("async");
 
-  private static byte[] FAMILY = Bytes.toBytes("cf");
+  private static final byte[] FAMILY = Bytes.toBytes("cf");
 
-  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+  private static final byte[] QUALIFIER = Bytes.toBytes("cq");
 
-  private static byte[] VALUE = Bytes.toBytes("value");
+  private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
+
+  private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
 
   private static AsyncConnection ASYNC_CONN;
 
@@ -72,9 +79,14 @@ public class TestAsyncTableNoncedRetry {
 
   private byte[] row;
 
-  private static AtomicInteger CALLED = new AtomicInteger();
+  private static final AtomicInteger CALLED = new AtomicInteger();
+
+  private static final long SLEEP_TIME = 2000;
+
+  private static final long RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
 
-  private static long SLEEP_TIME = 2000;
+  // The number of miniBatchOperations that are executed in a RegionServer
+  private static int miniBatchOperationCount;
 
   public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
 
@@ -84,21 +96,12 @@ public class TestAsyncTableNoncedRetry {
     }
 
     @Override
-    public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append,
-        Result result) throws IOException {
-      if (CALLED.getAndIncrement() == 0) {
-        Threads.sleepWithoutInterrupt(SLEEP_TIME);
-      }
-      return RegionObserver.super.postAppend(c, append, result);
-    }
-
-    @Override
-    public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c,
-        Increment increment, Result result) throws IOException {
-      if (CALLED.getAndIncrement() == 0) {
+    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+      // We sleep when the last of the miniBatchOperations is executed
+      if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
         Threads.sleepWithoutInterrupt(SLEEP_TIME);
       }
-      return RegionObserver.super.postIncrement(c, increment, result);
     }
   }
 
@@ -129,8 +132,11 @@ public class TestAsyncTableNoncedRetry {
   public void testAppend() throws InterruptedException, ExecutionException {
     assertEquals(0, CALLED.get());
     AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
-      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
     Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+
     // make sure we called twice and the result is still correct
     assertEquals(2, CALLED.get());
     assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
@@ -141,9 +147,12 @@ public class TestAsyncTableNoncedRetry {
     ExecutionException {
     assertEquals(0, CALLED.get());
     AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
-      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
     Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
       .setReturnResults(false)).get();
+
     // make sure we called twice and the result is still correct
     assertEquals(2, CALLED.get());
     assertTrue(result.isEmpty());
@@ -153,10 +162,14 @@ public class TestAsyncTableNoncedRetry {
   public void testIncrement() throws InterruptedException, ExecutionException {
     assertEquals(0, CALLED.get());
     AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
-      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
-    assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue());
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get();
+
     // make sure we called twice and the result is still correct
     assertEquals(2, CALLED.get());
+    assertEquals(1L, result);
   }
 
   @Test
@@ -164,11 +177,218 @@ public class TestAsyncTableNoncedRetry {
     ExecutionException {
     assertEquals(0, CALLED.get());
     AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
-      .setRpcTimeout(SLEEP_TIME / 2, TimeUnit.MILLISECONDS).build();
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
     Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
       .setReturnResults(false)).get();
+
     // make sure we called twice and the result is still correct
     assertEquals(2, CALLED.get());
     assertTrue(result.isEmpty());
   }
+
+  @Test
+  public void testIncrementInRowMutations()
+    throws InterruptedException, ExecutionException, IOException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    Result result = table.mutateRow(new RowMutations(row)
+      .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+      .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+  }
+
+  @Test
+  public void testAppendInRowMutations()
+    throws InterruptedException, ExecutionException, IOException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    Result result = table.mutateRow(new RowMutations(row)
+      .add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+      .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+  }
+
+  @Test
+  public void testIncrementAndAppendInRowMutations()
+    throws InterruptedException, ExecutionException, IOException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setWriteRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    Result result = table.mutateRow(new RowMutations(row)
+      .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+      .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
+  }
+
+  @Test
+  public void testIncrementInCheckAndMutate() throws InterruptedException, ExecutionException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER2)
+      .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+  }
+
+  @Test
+  public void testAppendInCheckAndMutate() throws InterruptedException, ExecutionException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER2)
+      .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+  }
+
+  @Test
+  public void testIncrementInRowMutationsInCheckAndMutate() throws InterruptedException,
+    ExecutionException, IOException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER3)
+      .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+        .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+  }
+
+  @Test
+  public void testAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
+    ExecutionException, IOException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER3)
+      .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+        .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+  }
+
+  @Test
+  public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws InterruptedException,
+    ExecutionException, IOException {
+    assertEquals(0, CALLED.get());
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER3)
+      .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+        .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)))).get();
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+  }
+
+  @Test
+  public void testBatch() throws InterruptedException,
+    ExecutionException, IOException {
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+    byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
+    byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
+
+    assertEquals(0, CALLED.get());
+
+    AsyncTable<?> table = ASYNC_CONN.getTableBuilder(TABLE_NAME)
+      .setRpcTimeout(RPC_TIMEOUT, TimeUnit.MILLISECONDS).build();
+
+    miniBatchOperationCount = 6;
+    List<Object> results = table.batchAll(Arrays.asList(
+      new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
+      new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
+      new RowMutations(row3)
+        .add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
+        .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
+      CheckAndMutate.newBuilder(row4)
+        .ifNotExists(FAMILY, QUALIFIER2)
+        .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
+      CheckAndMutate.newBuilder(row5)
+        .ifNotExists(FAMILY, QUALIFIER2)
+        .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
+      CheckAndMutate.newBuilder(row6)
+        .ifNotExists(FAMILY, QUALIFIER3)
+        .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
+          .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE))))).get();
+
+    // make sure we called twice and the result is still correct
+
+    // should be called 12 times as 6 miniBatchOperations are called twice
+    assertEquals(12, CALLED.get());
+
+    assertArrayEquals(VALUE, ((Result) results.get(0)).getValue(FAMILY, QUALIFIER));
+
+    assertEquals(1L, Bytes.toLong(((Result) results.get(1)).getValue(FAMILY, QUALIFIER)));
+
+    assertEquals(1L, Bytes.toLong(((Result) results.get(2)).getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, ((Result) results.get(2)).getValue(FAMILY, QUALIFIER2));
+
+    CheckAndMutateResult result;
+
+    result = (CheckAndMutateResult) results.get(3);
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+
+    result = (CheckAndMutateResult) results.get(4);
+    assertTrue(result.isSuccess());
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+
+    result = (CheckAndMutateResult) results.get(5);
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java
new file mode 100644
index 0000000..51ecba9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableNoncedRetry.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestHTableNoncedRetry {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestHTableNoncedRetry.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static final byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static final byte[] QUALIFIER = Bytes.toBytes("cq");
+
+  private static final byte[] QUALIFIER2 = Bytes.toBytes("cq2");
+
+  private static final byte[] QUALIFIER3 = Bytes.toBytes("cq3");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static Connection CONN;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private byte[] row;
+
+  private Table table;
+
+  private static final AtomicInteger CALLED = new AtomicInteger();
+
+  private static final int SLEEP_TIME = 2000;
+
+  private static final int RPC_TIMEOUT = SLEEP_TIME / 4 * 3; // three fourths of the sleep time
+
+  // The number of miniBatchOperations that are executed in a RegionServer
+  private static int miniBatchOperationCount;
+
+  public static final class SleepOnceCP implements RegionObserver, RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+      // We sleep when the last of the miniBatchOperations is executed
+      if (CALLED.getAndIncrement() == miniBatchOperationCount - 1) {
+        Threads.sleepWithoutInterrupt(SLEEP_TIME);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.getAdmin()
+      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+        .setCoprocessor(SleepOnceCP.class.getName()).build());
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    Closeables.close(CONN, true);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
+    CALLED.set(0);
+
+    table = CONN.getTable(TABLE_NAME);
+    table.setRpcTimeout(RPC_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    table.close();
+  }
+
+  @Test
+  public void testAppend() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+  }
+
+  @Test
+  public void testAppendWhenReturnResultsEqualsFalse() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    Result result = table.append(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)
+      .setReturnResults(false));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  public void testIncrement() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    long result = table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L);
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertEquals(1L, result);
+  }
+
+  @Test
+  public void testIncrementWhenReturnResultsEqualsFalse() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    Result result = table.increment(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)
+      .setReturnResults(false));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  public void testIncrementInRowMutations() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    Result result = table.mutateRow(new RowMutations(row)
+      .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+      .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+  }
+
+  @Test
+  public void testAppendInRowMutations() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    Result result = table.mutateRow(new RowMutations(row)
+      .add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+      .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2)));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
+  }
+
+  @Test
+  public void testIncrementAndAppendInRowMutations() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    Result result = table.mutateRow(new RowMutations(row)
+      .add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+      .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE)));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER2));
+  }
+
+  @Test
+  public void testIncrementInCheckAndMutate() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER2)
+      .build(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L)));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+  }
+
+  @Test
+  public void testAppendInCheckAndMutate() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER2)
+      .build(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE)));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+  }
+
+  @Test
+  public void testIncrementInRowMutationsInCheckAndMutate() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER3)
+      .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+        .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+  }
+
+  @Test
+  public void testAppendInRowMutationsInCheckAndMutate() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER3)
+      .build(new RowMutations(row).add(new Append(row).addColumn(FAMILY, QUALIFIER, VALUE))
+        .add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER2))));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+  }
+
+  @Test
+  public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws IOException {
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 1;
+    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifNotExists(FAMILY, QUALIFIER3)
+      .build(new RowMutations(row).add(new Increment(row).addColumn(FAMILY, QUALIFIER, 1L))
+        .add(new Append(row).addColumn(FAMILY, QUALIFIER2, VALUE))));
+
+    // make sure we called twice and the result is still correct
+    assertEquals(2, CALLED.get());
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+  }
+
+  @Test
+  public void testBatch() throws IOException, InterruptedException {
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
+    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
+    byte[] row5 = Bytes.toBytes(Bytes.toString(row) + "5");
+    byte[] row6 = Bytes.toBytes(Bytes.toString(row) + "6");
+
+    assertEquals(0, CALLED.get());
+
+    miniBatchOperationCount = 6;
+    Object[] results = new Object[6];
+    table.batch(Arrays.asList(
+      new Append(row).addColumn(FAMILY, QUALIFIER, VALUE),
+      new Increment(row2).addColumn(FAMILY, QUALIFIER, 1L),
+      new RowMutations(row3)
+        .add(new Increment(row3).addColumn(FAMILY, QUALIFIER, 1L))
+        .add(new Append(row3).addColumn(FAMILY, QUALIFIER2, VALUE)),
+      CheckAndMutate.newBuilder(row4)
+        .ifNotExists(FAMILY, QUALIFIER2)
+        .build(new Increment(row4).addColumn(FAMILY, QUALIFIER, 1L)),
+      CheckAndMutate.newBuilder(row5)
+        .ifNotExists(FAMILY, QUALIFIER2)
+        .build(new Append(row5).addColumn(FAMILY, QUALIFIER, VALUE)),
+      CheckAndMutate.newBuilder(row6)
+        .ifNotExists(FAMILY, QUALIFIER3)
+        .build(new RowMutations(row6).add(new Increment(row6).addColumn(FAMILY, QUALIFIER, 1L))
+          .add(new Append(row6).addColumn(FAMILY, QUALIFIER2, VALUE)))), results);
+
+    // make sure we called twice and the result is still correct
+
+    // should be called 12 times as 6 miniBatchOperations are called twice
+    assertEquals(12, CALLED.get());
+
+    assertArrayEquals(VALUE, ((Result) results[0]).getValue(FAMILY, QUALIFIER));
+
+    assertEquals(1L, Bytes.toLong(((Result) results[1]).getValue(FAMILY, QUALIFIER)));
+
+    assertEquals(1L, Bytes.toLong(((Result) results[2]).getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, ((Result) results[2]).getValue(FAMILY, QUALIFIER2));
+
+    CheckAndMutateResult result;
+
+    result = (CheckAndMutateResult) results[3];
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+
+    result = (CheckAndMutateResult) results[4];
+    assertTrue(result.isSuccess());
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER));
+
+    result = (CheckAndMutateResult) results[5];
+    assertTrue(result.isSuccess());
+    assertEquals(1L, Bytes.toLong(result.getResult().getValue(FAMILY, QUALIFIER)));
+    assertArrayEquals(VALUE, result.getResult().getValue(FAMILY, QUALIFIER2));
+  }
+}