You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/09/08 12:32:50 UTC

[hbase] branch branch-2 updated: HBASE-24602 Add Increment and Append support to CheckAndMutate (#2363)

This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new daccdb1  HBASE-24602 Add Increment and Append support to CheckAndMutate (#2363)
daccdb1 is described below

commit daccdb19a2046f3fd122d9818495cf4742fbfb07
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Tue Sep 8 21:32:35 2020 +0900

    HBASE-24602 Add Increment and Append support to CheckAndMutate (#2363)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/client/CheckAndMutate.java |  33 +-
 .../org/apache/hadoop/hbase/client/HTable.java     | 112 ++-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |   7 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |   4 +
 .../hbase/shaded/protobuf/RequestConverter.java    |  63 +-
 .../hbase/shaded/protobuf/ResponseConverter.java   |   4 +-
 .../hadoop/hbase/coprocessor/RegionObserver.java   |  19 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 806 +++++++++++----------
 .../regionserver/MiniBatchOperationInProgress.java |  18 +
 .../hadoop/hbase/regionserver/OperationStatus.java |  26 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 162 +----
 .../apache/hadoop/hbase/regionserver/Region.java   |   3 +-
 .../hbase/security/access/AccessController.java    |   5 +-
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 230 ++++++
 .../hadoop/hbase/client/TestAsyncTableBatch.java   |  35 +-
 .../hadoop/hbase/client/TestCheckAndMutate.java    | 219 ++++++
 .../hadoop/hbase/client/TestFromClientSide3.java   |  35 +-
 .../hbase/regionserver/TestAtomicOperation.java    |   2 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     | 137 +++-
 19 files changed, 1299 insertions(+), 621 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
index 26eb23d..a163c8d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -31,8 +31,7 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
- * Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
- * and {@link RowMutations} are supported.
+ * Used to perform CheckAndMutate operations.
  * <p>
  * Use the builder class to instantiate a CheckAndMutate object.
  * This builder class is fluent style APIs, the code are like:
@@ -137,9 +136,9 @@ public final class CheckAndMutate extends Mutation {
     }
 
     private void preCheck(Row action) {
-      Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is null");
+      Preconditions.checkNotNull(action, "action is null");
       if (!Bytes.equals(row, action.getRow())) {
-        throw new IllegalArgumentException("The row of the action (Put/Delete/RowMutations) <" +
+        throw new IllegalArgumentException("The row of the action <" +
           Bytes.toStringBinary(action.getRow()) + "> doesn't match the original one <" +
           Bytes.toStringBinary(this.row) + ">");
       }
@@ -175,6 +174,32 @@ public final class CheckAndMutate extends Mutation {
     }
 
     /**
+     * @param increment data to increment if check succeeds
+     * @return a CheckAndMutate object
+     */
+    public CheckAndMutate build(Increment increment) {
+      preCheck(increment);
+      if (filter != null) {
+        return new CheckAndMutate(row, filter, timeRange, increment);
+      } else {
+        return new CheckAndMutate(row, family, qualifier, op, value, timeRange, increment);
+      }
+    }
+
+    /**
+     * @param append data to append if check succeeds
+     * @return a CheckAndMutate object
+     */
+    public CheckAndMutate build(Append append) {
+      preCheck(append);
+      if (filter != null) {
+        return new CheckAndMutate(row, filter, timeRange, append);
+      } else {
+        return new CheckAndMutate(row, family, qualifier, op, value, timeRange, append);
+      }
+    }
+
+    /**
      * @param mutation mutations to perform if check succeeds
      * @return a CheckAndMutate object
      */
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a6866d2..7b81f44 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -678,7 +678,7 @@ public class HTable implements Table {
   @Deprecated
   public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
       final byte [] value, final Put put) throws IOException {
-    return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
+    return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
       put).isSuccess();
   }
 
@@ -686,7 +686,7 @@ public class HTable implements Table {
   @Deprecated
   public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
       final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
-    return doCheckAndPut(row, family, qualifier, toCompareOperator(compareOp), value, null,
+    return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
       null, put).isSuccess();
   }
 
@@ -696,33 +696,14 @@ public class HTable implements Table {
       final CompareOperator op, final byte [] value, final Put put) throws IOException {
     // The name of the operators in CompareOperator are intentionally those of the
     // operators in the filter's CompareOp enum.
-    return doCheckAndPut(row, family, qualifier, op, value, null, null, put).isSuccess();
-  }
-
-  private CheckAndMutateResult doCheckAndPut(final byte[] row, final byte[] family,
-    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
-    final TimeRange timeRange, final Put put) throws IOException {
-    ClientServiceCallable<CheckAndMutateResult> callable =
-        new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
-            this.rpcControllerFactory.newController(), put.getPriority()) {
-      @Override
-      protected CheckAndMutateResult rpcCall() throws Exception {
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
-          filter, timeRange, put);
-        MutateResponse response = doMutate(request);
-        return new CheckAndMutateResult(response.getProcessed(), null);
-      }
-    };
-    return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs)
-        .callWithRetries(callable, this.operationTimeoutMs);
+    return doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess();
   }
 
   @Override
   @Deprecated
   public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
     final byte[] value, final Delete delete) throws IOException {
-    return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, null,
+    return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null,
       null, delete).isSuccess();
   }
 
@@ -730,7 +711,7 @@ public class HTable implements Table {
   @Deprecated
   public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
     final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
-    return doCheckAndDelete(row, family, qualifier, toCompareOperator(compareOp), value, null,
+    return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
       null, delete).isSuccess();
   }
 
@@ -738,40 +719,7 @@ public class HTable implements Table {
   @Deprecated
   public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
     final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
-    return doCheckAndDelete(row, family, qualifier, op, value, null, null, delete).isSuccess();
-  }
-
-  private CheckAndMutateResult doCheckAndDelete(final byte[] row, final byte[] family,
-    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
-    final TimeRange timeRange, final Delete delete) throws IOException {
-    CancellableRegionServerCallable<SingleResponse> callable =
-      new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
-        this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
-        new RetryingTimeTracker().start(), delete.getPriority()) {
-        @Override
-        protected SingleResponse rpcCall() throws Exception {
-          MutateRequest request = RequestConverter
-            .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
-              qualifier, op, value, filter, timeRange, delete);
-          MutateResponse response = doMutate(request);
-          return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
-        }
-      };
-    List<Delete> rows = Collections.singletonList(delete);
-    Object[] results = new Object[1];
-    AsyncProcessTask task =
-      AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
-        .setCallable(callable)
-        // TODO any better timeout?
-        .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
-        .setOperationTimeout(operationTimeoutMs)
-        .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
-    AsyncRequestFuture ars = multiAp.submit(task);
-    ars.waitUntilDone();
-    if (ars.hasError()) {
-      throw ars.getErrors();
-    }
-    return new CheckAndMutateResult(((SingleResponse.Entry) results[0]).isProcessed(), null);
+    return doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess();
   }
 
   @Override
@@ -856,16 +804,14 @@ public class HTable implements Table {
   @Override
   public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
     Row action = checkAndMutate.getAction();
-    if (action instanceof Put) {
-      Put put = (Put) action;
-      validatePut(put);
-      return doCheckAndPut(checkAndMutate.getRow(), checkAndMutate.getFamily(),
-        checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
-        checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), put);
-    } else if (action instanceof Delete) {
-      return doCheckAndDelete(checkAndMutate.getRow(), checkAndMutate.getFamily(),
+    if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
+      action instanceof Append) {
+      if (action instanceof Put) {
+        validatePut((Put) action);
+      }
+      return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
         checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
-        checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Delete) action);
+        checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action);
     } else {
       return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
         checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
@@ -873,6 +819,29 @@ public class HTable implements Table {
     }
   }
 
+  private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
+    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+    final TimeRange timeRange, final Mutation mutation) throws IOException {
+    ClientServiceCallable<CheckAndMutateResult> callable =
+      new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
+        this.rpcControllerFactory.newController(), mutation.getPriority()) {
+        @Override
+        protected CheckAndMutateResult rpcCall() throws Exception {
+          MutateRequest request = RequestConverter.buildMutateRequest(
+            getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
+            filter, timeRange, mutation);
+          MutateResponse response = doMutate(request);
+          if (response.hasResult()) {
+            return new CheckAndMutateResult(response.getProcessed(),
+              ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()));
+          }
+          return new CheckAndMutateResult(response.getProcessed(), null);
+        }
+      };
+    return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs)
+      .callWithRetries(callable, this.operationTimeoutMs);
+  }
+
   @Override
   public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
     throws IOException {
@@ -1331,13 +1300,14 @@ public class HTable implements Table {
     public boolean thenPut(Put put) throws IOException {
       validatePut(put);
       preCheck();
-      return doCheckAndPut(row, family, qualifier, op, value, null, timeRange, put).isSuccess();
+      return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
+        .isSuccess();
     }
 
     @Override
     public boolean thenDelete(Delete delete) throws IOException {
       preCheck();
-      return doCheckAndDelete(row, family, qualifier, op, value, null, timeRange, delete)
+      return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
         .isSuccess();
     }
 
@@ -1369,12 +1339,12 @@ public class HTable implements Table {
     @Override
     public boolean thenPut(Put put) throws IOException {
       validatePut(put);
-      return doCheckAndPut(row, null, null, null, null, filter, timeRange, put).isSuccess();
+      return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess();
     }
 
     @Override
     public boolean thenDelete(Delete delete) throws IOException {
-      return doCheckAndDelete(row, null, null, null, null, filter, timeRange, delete).isSuccess();
+      return doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess();
     }
 
     @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index aa42838..c3d3246 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -461,8 +461,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     if (checkAndMutate.getAction() instanceof Put) {
       validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
     }
-    if (checkAndMutate.getAction() instanceof Put ||
-      checkAndMutate.getAction() instanceof Delete) {
+    if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
+      || checkAndMutate.getAction() instanceof Increment
+      || checkAndMutate.getAction() instanceof Append) {
       Mutation mutation = (Mutation) checkAndMutate.getAction();
       if (mutation instanceof Put) {
         validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
@@ -475,7 +476,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
             checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
             checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
             checkAndMutate.getTimeRange(), m),
-          (c, r) -> ResponseConverter.getCheckAndMutateResult(r)))
+          (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
         .call();
     } else if (checkAndMutate.getAction() instanceof RowMutations) {
       RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 094065c..b9231bc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -3503,6 +3503,10 @@ public final class ProtobufUtil {
           return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
         case DELETE:
           return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
+        case INCREMENT:
+          return builder.build(ProtobufUtil.toIncrement(mutation, cellScanner));
+        case APPEND:
+          return builder.build(ProtobufUtil.toAppend(mutation, cellScanner));
         default:
           throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
       }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index c877050..d64968a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -239,7 +239,7 @@ public final class RequestConverter {
   }
 
   /**
-   * Create a protocol buffer MutateRequest for a conditioned put/delete
+   * Create a protocol buffer MutateRequest for a conditioned put/delete/increment/append
    *
    * @return a mutate request
    * @throws IOException
@@ -247,15 +247,9 @@ public final class RequestConverter {
   public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
     final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
     final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
-    MutationType type;
-    if (mutation instanceof Put) {
-      type = MutationType.PUT;
-    } else {
-      type = MutationType.DELETE;
-    }
     return MutateRequest.newBuilder()
       .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
-      .setMutation(ProtobufUtil.toMutation(type, mutation))
+      .setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
       .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
       .build();
   }
@@ -775,16 +769,12 @@ public final class RequestConverter {
       } else if (row instanceof Delete) {
         buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder);
       } else if (row instanceof Append) {
-        Append a = (Append)row;
-        cells.add(a);
-        builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
-          MutationType.APPEND, a, mutationBuilder, action.getNonce())));
+        buildNoDataRegionAction((Append) row, cells, action.getNonce(), builder, actionBuilder,
+          mutationBuilder);
         hasNonce = true;
       } else if (row instanceof Increment) {
-        Increment i = (Increment)row;
-        cells.add(i);
-        builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
-          MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
+        buildNoDataRegionAction((Increment) row, cells, action.getNonce(), builder, actionBuilder,
+          mutationBuilder);
         hasNonce = true;
       } else if (row instanceof RegionCoprocessorServiceExec) {
         RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
@@ -858,6 +848,16 @@ public final class RequestConverter {
         mutationBuilder.clear();
         buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder,
           mutationBuilder);
+      } else if (cam.getAction() instanceof Increment) {
+        actionBuilder.clear();
+        mutationBuilder.clear();
+        buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+          actionBuilder, mutationBuilder);
+      } else if (cam.getAction() instanceof Append) {
+        actionBuilder.clear();
+        mutationBuilder.clear();
+        buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+          actionBuilder, mutationBuilder);
       } else if (cam.getAction() instanceof RowMutations) {
         buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
           mutationBuilder);
@@ -904,6 +904,24 @@ public final class RequestConverter {
     }
   }
 
+  private static void buildNoDataRegionAction(final Increment increment,
+    final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
+    final ClientProtos.Action.Builder actionBuilder,
+    final MutationProto.Builder mutationBuilder) throws IOException {
+    cells.add(increment);
+    regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
+      MutationType.INCREMENT, increment, mutationBuilder, nonce)));
+  }
+
+  private static void buildNoDataRegionAction(final Append append,
+    final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
+    final ClientProtos.Action.Builder actionBuilder,
+    final MutationProto.Builder mutationBuilder) throws IOException {
+    cells.add(append);
+    regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
+      MutationType.APPEND, append, mutationBuilder, nonce)));
+  }
+
   private static void buildNoDataRegionAction(final RowMutations rowMutations,
     final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
     final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
@@ -926,6 +944,19 @@ public final class RequestConverter {
     }
   }
 
+  private static MutationType getMutationType(Mutation mutation) {
+    assert !(mutation instanceof CheckAndMutate);
+    if (mutation instanceof Put) {
+      return MutationType.PUT;
+    } else if (mutation instanceof Delete) {
+      return MutationType.DELETE;
+    } else if (mutation instanceof Increment) {
+      return MutationType.INCREMENT;
+    } else {
+      return MutationType.APPEND;
+    }
+  }
+
 // End utilities for Client
 //Start utilities for Admin
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index ffe3970..87a0df1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -225,11 +225,11 @@ public final class ResponseConverter {
    * @return a CheckAndMutateResult object
    */
   public static CheckAndMutateResult getCheckAndMutateResult(
-    ClientProtos.MutateResponse mutateResponse) {
+    ClientProtos.MutateResponse mutateResponse, CellScanner cells) throws IOException {
     boolean success = mutateResponse.getProcessed();
     Result result = null;
     if (mutateResponse.hasResult()) {
-      result = ProtobufUtil.toResult(mutateResponse.getResult());
+      result = ProtobufUtil.toResult(mutateResponse.getResult(), cells);
     }
     return new CheckAndMutateResult(success, result);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 0bc0631..ab2c8ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -441,8 +441,9 @@ public interface RegionObserver {
   /**
    * This will be called for every batch mutation operation happening at the server. This will be
    * called after acquiring the locks on the mutating rows and after applying the proper timestamp
-   * for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus
-   * of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
+   * for each Mutation at the server. The batch may contain Put/Delete/Increment/Append. By
+   * setting OperationStatus of Mutations
+   * ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
    * {@link RegionObserver} can make Region to skip these Mutations.
    * <p>
    * Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
@@ -454,10 +455,12 @@ public interface RegionObserver {
       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {}
 
   /**
-   * This will be called after applying a batch of Mutations on a region. The Mutations are added to
-   * memstore and WAL. The difference of this one with
-   * {@link #postPut(ObserverContext, Put, WALEdit, Durability) }
-   * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) } is
+   * This will be called after applying a batch of Mutations on a region. The Mutations are added
+   * to memstore and WAL. The difference of this one with
+   * {@link #postPut(ObserverContext, Put, WALEdit, Durability)}
+   * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)}
+   * and {@link #postIncrement(ObserverContext, Increment, Result)}
+   * and {@link #postAppend(ObserverContext, Append, Result)} is
    * this hook will be executed before the mvcc transaction completion.
    * <p>
    * Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
@@ -488,8 +491,8 @@ public interface RegionObserver {
       Operation operation) throws IOException {}
 
   /**
-   * Called after the completion of batch put/delete and will be called even if the batch operation
-   * fails.
+   * Called after the completion of batch put/delete/increment/append and will be called even if
+   * the batch operation fails.
    * <p>
    * Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
    * If need a Cell reference for later use, copy the cell and use that.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1e1bc6a..a934e5d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3211,6 +3211,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     protected final WALEdit[] walEditsFromCoprocessors;
     // reference family cell maps directly so coprocessors can mutate them if desired
     protected final Map<byte[], List<Cell>>[] familyCellMaps;
+    // For Increment/Append operations
+    protected final Result[] results;
+    // For nonce operations
+    protected final boolean[] canProceed;
 
     protected final HRegion region;
     protected int nextIndexToProcess = 0;
@@ -3225,6 +3229,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
       this.walEditsFromCoprocessors = new WALEdit[operations.length];
       familyCellMaps = new Map[operations.length];
+      this.results = new Result[operations.length];
+      this.canProceed = new boolean[operations.length];
 
       this.region = region;
       observedExceptions = new ObservedExceptionsInBatch();
@@ -3279,10 +3285,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     /**
      * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs
-     * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on
-     * entire batch and will be called from outside of class to check and prepare batch. This can
-     * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a
-     * 'for' loop over mutations.
+     * CP prePut()/preDelete()/preIncrement()/preAppend() hooks for all mutations in a batch. This
+     * is intended to operate on entire batch and will be called from outside of class to check
+     * and prepare batch. This can be implemented by calling helper method
+     * {@link #checkAndPrepareMutation(int, long)} in a 'for' loop over mutations.
      */
     public abstract void checkAndPrepare() throws IOException;
 
@@ -3350,8 +3356,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     /**
      * Helper method that checks and prepares only one mutation. This can be used to implement
      * {@link #checkAndPrepare()} for entire Batch.
-     * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called
-     * after prePut()/ preDelete() CP hooks are run for the mutation
+     * NOTE: As CP prePut()/preDelete()/preIncrement()/preAppend() hooks may modify mutations,
+     * this method should be called after prePut()/preDelete()/preIncrement()/preAppend() CP hooks
+     * are run for the mutation
      */
     protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)
         throws IOException {
@@ -3360,8 +3367,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // Check the families in the put. If bad, skip this one.
         checkAndPreparePut((Put) mutation);
         region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);
-      } else {
+      } else if (mutation instanceof Delete) {
         region.prepareDelete((Delete) mutation);
+      } else if (mutation instanceof Increment || mutation instanceof Append) {
+        region.checkFamilies(mutation.getFamilyCellMap().keySet());
       }
     }
 
@@ -3696,7 +3705,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     @Override
     public void checkAndPrepare() throws IOException {
-      final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes
+      // index 0: puts, index 1: deletes, index 2: increments, index 3: append
+      final int[] metrics = {0, 0, 0, 0};
+
       visitBatchOperations(true, this.size(), new Visitor() {
         private long now = EnvironmentEdgeManager.currentTime();
         private WALEdit walEdit;
@@ -3736,21 +3747,57 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // There were some Deletes in the batch.
           region.metricsRegion.updateDelete();
         }
+        if (metrics[2] > 0) {
+          // There were some Increment in the batch.
+          region.metricsRegion.updateIncrement();
+        }
+        if (metrics[3] > 0) {
+          // There were some Append in the batch.
+          region.metricsRegion.updateAppend();
+        }
       }
     }
 
     @Override
     public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
         long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
-      byte[] byteTS = Bytes.toBytes(timestamp);
       visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
         Mutation mutation = getMutation(index);
         if (mutation instanceof Put) {
-          region.updateCellTimestamps(familyCellMaps[index].values(), byteTS);
+          HRegion.updateCellTimestamps(familyCellMaps[index].values(), Bytes.toBytes(timestamp));
           miniBatchOp.incrementNumOfPuts();
-        } else {
-          region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS);
+        } else if (mutation instanceof Delete) {
+          region.prepareDeleteTimestamps(mutation, familyCellMaps[index],
+            Bytes.toBytes(timestamp));
           miniBatchOp.incrementNumOfDeletes();
+        } else if (mutation instanceof Increment || mutation instanceof Append) {
+          // For nonce operations
+          canProceed[index] = startNonceOperation(nonceGroup, nonce);
+          if (!canProceed[index]) {
+            // convert duplicate increment/append to get
+            List<Cell> results = region.get(toGet(mutation), false, nonceGroup, nonce);
+            retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS,
+              Result.create(results));
+            return true;
+          }
+
+          boolean returnResults;
+          if (mutation instanceof Increment) {
+            returnResults = ((Increment) mutation).isReturnResults();
+            miniBatchOp.incrementNumOfIncrements();
+          } else {
+            returnResults = ((Append) mutation).isReturnResults();
+            miniBatchOp.incrementNumOfAppends();
+          }
+          Result result = doCoprocessorPreCallAfterRowLock(mutation);
+          if (result != null) {
+            retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS,
+              returnResults ? result : Result.EMPTY_RESULT);
+            return true;
+          }
+          List<Cell> results = returnResults ? new ArrayList<>(mutation.size()) : null;
+          familyCellMaps[index] = reckonDeltas(mutation, results, timestamp);
+          this.results[index] = results != null ? Result.create(results): Result.EMPTY_RESULT;
         }
         region.rewriteCellTags(familyCellMaps[index], mutation);
 
@@ -3775,6 +3822,253 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
 
+    /**
+     * Starts the nonce operation for a mutation, if needed.
+     * @param nonceGroup Nonce group from the request.
+     * @param nonce Nonce.
+     * @return whether to proceed this mutation.
+     */
+    private boolean startNonceOperation(long nonceGroup, long nonce) throws IOException {
+      if (region.rsServices == null || region.rsServices.getNonceManager() == null
+        || nonce == HConstants.NO_NONCE) {
+        return true;
+      }
+      boolean canProceed;
+      try {
+        canProceed = region.rsServices.getNonceManager()
+          .startOperation(nonceGroup, nonce, region.rsServices);
+      } catch (InterruptedException ex) {
+        throw new InterruptedIOException("Nonce start operation interrupted");
+      }
+      return canProceed;
+    }
+
+    /**
+     * Ends nonce operation for a mutation, if needed.
+     * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
+     * @param nonce Nonce.
+     * @param success Whether the operation for this nonce has succeeded.
+     */
+    private void endNonceOperation(long nonceGroup, long nonce, boolean success) {
+      if (region.rsServices != null && region.rsServices.getNonceManager() != null
+        && nonce != HConstants.NO_NONCE) {
+        region.rsServices.getNonceManager().endOperation(nonceGroup, nonce, success);
+      }
+    }
+
+    private static Get toGet(final Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Get get = new Get(mutation.getRow());
+      CellScanner cellScanner = mutation.cellScanner();
+      while (!cellScanner.advance()) {
+        Cell cell = cellScanner.current();
+        get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
+      }
+      if (mutation instanceof Increment) {
+        // Increment
+        Increment increment = (Increment) mutation;
+        get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
+      } else {
+        // Append
+        Append append = (Append) mutation;
+        get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
+      }
+      for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+        get.setAttribute(entry.getKey(), entry.getValue());
+      }
+      return get;
+    }
+
+    /**
+     * Do coprocessor pre-increment or pre-append after row lock call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the preferred Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCallAfterRowLock(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results,
+      long now) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Map<byte[], List<Cell>> ret = new HashMap<>();
+      // Process a Store/family at a time.
+      for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
+        final byte[] columnFamilyName = entry.getKey();
+        List<Cell> deltas = entry.getValue();
+        // Reckon for the Store what to apply to WAL and MemStore.
+        List<Cell> toApply = reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+          now, deltas, results);
+        if (!toApply.isEmpty()) {
+          for (Cell cell : toApply) {
+            HStore store = region.getStore(cell);
+            if (store == null) {
+              region.checkFamily(CellUtil.cloneFamily(cell));
+            } else {
+              ret.computeIfAbsent(store.getColumnFamilyDescriptor().getName(),
+                key -> new ArrayList<>()).add(cell);
+            }
+          }
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed
+     * column family/Store.
+     *
+     * Does Get of current value and then adds passed in deltas for this Store returning the
+     * result.
+     *
+     * @param mutation The encompassing Mutation object
+     * @param deltas Changes to apply to this Store; either increment amount or data to append
+     * @param results In here we accumulate all the Cells we are to return to the client. If null,
+     *   client doesn't want results returned.
+     * @return Resulting Cells after <code>deltas</code> have been applied to current
+     *   values. Side effect is our filling out of the <code>results</code> List.
+     */
+    private List<Cell> reckonDeltasByStore(HStore store, Mutation mutation, long now,
+      List<Cell> deltas, List<Cell> results) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
+      List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
+
+      // Get previous values for all columns in this family.
+      TimeRange tr;
+      if (mutation instanceof Increment) {
+        tr = ((Increment) mutation).getTimeRange();
+      } else {
+        tr = ((Append) mutation).getTimeRange();
+      }
+      List<Cell> currentValues = get(mutation, store, deltas, tr);
+
+      // Iterate the input columns and update existing values if they were found, otherwise
+      // add new column initialized to the delta amount
+      int currentValuesIndex = 0;
+      for (int i = 0; i < deltas.size(); i++) {
+        Cell delta = deltas.get(i);
+        Cell currentValue = null;
+        if (currentValuesIndex < currentValues.size() &&
+          CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
+          currentValue = currentValues.get(currentValuesIndex);
+          if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
+            currentValuesIndex++;
+          }
+        }
+        // Switch on whether this an increment or an append building the new Cell to apply.
+        Cell newCell;
+        if (mutation instanceof Increment) {
+          long deltaAmount = getLongValue(delta);
+          final long newValue = currentValue == null ?
+            deltaAmount : getLongValue(currentValue) + deltaAmount;
+          newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
+            (oldCell) -> Bytes.toBytes(newValue));
+        } else {
+          newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
+            (oldCell) ->
+              ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
+                .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
+                .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())
+                .array()
+          );
+        }
+        if (region.maxCellSize > 0) {
+          int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell);
+          if (newCellSize > region.maxCellSize) {
+            String msg = "Cell with size " + newCellSize + " exceeds limit of "
+              + region.maxCellSize + " bytes in region " + this;
+            LOG.debug(msg);
+            throw new DoNotRetryIOException(msg);
+          }
+        }
+        cellPairs.add(new Pair<>(currentValue, newCell));
+        // Add to results to get returned to the Client. If null, cilent does not want results.
+        if (results != null) {
+          results.add(newCell);
+        }
+      }
+      // Give coprocessors a chance to update the new cells before apply to WAL or memstore
+      if (region.coprocessorHost != null) {
+        // Here the operation must be increment or append.
+        cellPairs = mutation instanceof Increment ?
+          region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
+          region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
+      }
+      return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
+    }
+
+    private static Cell reckonDelta(final Cell delta, final Cell currentCell,
+      final byte[] columnFamily, final long now, Mutation mutation,
+      Function<Cell, byte[]> supplier) throws IOException {
+      // Forward any tags found on the delta.
+      List<Tag> tags = TagUtil.carryForwardTags(delta);
+      tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
+      if (currentCell != null) {
+        tags = TagUtil.carryForwardTags(tags, currentCell);
+        byte[] newValue = supplier.apply(currentCell);
+        return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+          .setRow(mutation.getRow(), 0, mutation.getRow().length)
+          .setFamily(columnFamily, 0, columnFamily.length)
+          // copy the qualifier if the cell is located in shared memory.
+          .setQualifier(CellUtil.cloneQualifier(delta))
+          .setTimestamp(Math.max(currentCell.getTimestamp() + 1, now))
+          .setType(KeyValue.Type.Put.getCode())
+          .setValue(newValue, 0, newValue.length)
+          .setTags(TagUtil.fromList(tags))
+          .build();
+      } else {
+        PrivateCellUtil.updateLatestStamp(delta, now);
+        return CollectionUtils.isEmpty(tags) ? delta : PrivateCellUtil.createCell(delta, tags);
+      }
+    }
+
+    /**
+     * @return Get the long out of the passed in Cell
+     */
+    private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
+      int len = cell.getValueLength();
+      if (len != Bytes.SIZEOF_LONG) {
+        // throw DoNotRetryIOException instead of IllegalArgumentException
+        throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
+      }
+      return PrivateCellUtil.getValueAsLong(cell);
+    }
+
+    /**
+     * Do a specific Get on passed <code>columnFamily</code> and column qualifiers.
+     * @param mutation Mutation we are doing this Get for.
+     * @param store Which column family on row (TODO: Go all Gets in one go)
+     * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
+     * @return Return list of Cells found.
+     */
+    private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,
+      TimeRange tr) throws IOException {
+      // Sort the cells so that they match the order that they appear in the Get results.
+      // Otherwise, we won't be able to find the existing values if the cells are not specified
+      // in order by the client since cells are in an array list.
+      // TODO: I don't get why we are sorting. St.Ack 20150107
+      sort(coordinates, store.getComparator());
+      Get get = new Get(mutation.getRow());
+      for (Cell cell: coordinates) {
+        get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell));
+      }
+      // Increments carry time range. If an Increment instance, put it on the Get.
+      if (tr != null) {
+        get.setTimeRange(tr.getMin(), tr.getMax());
+      }
+      return region.get(get, false);
+    }
+
     @Override
     public List<Pair<NonceKey, WALEdit>> buildWALEdits(final MiniBatchOperationInProgress<Mutation>
         miniBatchOp) throws IOException {
@@ -3807,6 +4101,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         region.coprocessorHost.postBatchMutate(miniBatchOp);
       }
       super.completeMiniBatchOperations(miniBatchOp, writeEntry);
+
+      if (nonce != HConstants.NO_NONCE) {
+        if (region.rsServices != null && region.rsServices.getNonceManager() != null) {
+          region.rsServices.getNonceManager()
+            .addMvccToOperationContext(nonceGroup, nonce, writeEntry.getWriteNumber());
+        }
+      }
     }
 
     @Override
@@ -3818,24 +4119,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // synced so that the coprocessor contract is adhered to.
         if (region.coprocessorHost != null) {
           visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
-            // only for successful puts
+            // only for successful puts/deletes/increments/appends
             if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS) {
               Mutation m = getMutation(i);
               if (m instanceof Put) {
                 region.coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
-              } else {
+              } else if (m instanceof Delete) {
                 region.coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
+              } else if (m instanceof Increment) {
+                Result result = region.getCoprocessorHost().postIncrement((Increment) m,
+                  results[i]);
+                if (result != results[i]) {
+                  retCodeDetails[i] =
+                    new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result);
+                }
+              } else if (m instanceof Append) {
+                Result result = region.getCoprocessorHost().postAppend((Append) m, results[i]);
+                if (result != results[i]) {
+                  retCodeDetails[i] =
+                    new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result);
+                }
               }
             }
             return true;
           });
         }
 
+        // For nonce operations
+        visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
+          if (canProceed[i]) {
+            endNonceOperation(nonceGroup, nonce,
+              retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS);
+          }
+          return true;
+        });
+
         // See if the column families were consistent through the whole thing.
         // if they were then keep them. If they were not then pass a null.
         // null will be treated as unknown.
-        // Total time taken might be involving Puts and Deletes.
-        // Split the time for puts and deletes based on the total number of Puts and Deletes.
+        // Total time taken might be involving Puts, Deletes, Increments and Appends.
+        // Split the time for puts and deletes based on the total number of Puts, Deletes,
+        // Increments and Appends.
         if (region.metricsRegion != null) {
           if (miniBatchOp.getNumOfPuts() > 0) {
             // There were some Puts in the batch.
@@ -3845,6 +4169,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             // There were some Deletes in the batch.
             region.metricsRegion.updateDelete();
           }
+          if (miniBatchOp.getNumOfIncrements() > 0) {
+            // There were some Increments in the batch.
+            region.metricsRegion.updateIncrement();
+          }
+          if (miniBatchOp.getNumOfAppends() > 0) {
+            // There were some Appends in the batch.
+            region.metricsRegion.updateAppend();
+          }
         }
       }
 
@@ -3856,8 +4188,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     /**
-     * Runs prePut/ preDelete coprocessor hook for input mutation in a batch
-     * @param metrics Array of 2 ints. index 0: count of puts and index 1: count of deletes
+     * Runs prePut/preDelete/preIncrement/preAppend coprocessor hook for input mutation in a batch
+     * @param metrics Array of 2 ints. index 0: count of puts, index 1: count of deletes, index 2:
+     *   count of increments and 3: count of appends
      */
     private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] metrics)
         throws IOException {
@@ -3883,13 +4216,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           metrics[1]++;
           retCodeDetails[index] = OperationStatus.SUCCESS;
         }
+      } else if (m instanceof Increment) {
+        Increment increment = (Increment) m;
+        Result result = region.coprocessorHost.preIncrement(increment);
+        if (result != null) {
+          // pre hook says skip this Increment
+          // mark as success and skip in doMiniBatchMutation
+          metrics[2]++;
+          retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result);
+        }
+      } else if (m instanceof Append) {
+        Append append = (Append) m;
+        Result result = region.coprocessorHost.preAppend(append);
+        if (result != null) {
+          // pre hook says skip this Append
+          // mark as success and skip in doMiniBatchMutation
+          metrics[3]++;
+          retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result);
+        }
       } else {
-        String msg = "Put/Delete mutations only supported in a batch";
-        // In case of passing Append mutations along with the Puts and Deletes in batchMutate
-        // mark the operation return code as failure so that it will not be considered in
-        // the doMiniBatchMutation
+        String msg = "Put/Delete/Increment/Append mutations only supported in a batch";
         retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, msg);
-
         if (isAtomic()) { // fail, atomic means all or none
           throw new IOException(msg);
         }
@@ -4060,15 +4407,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
-      throws IOException {
-    return batchMutate(mutations, false, nonceGroup, nonce);
-  }
-
   public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
       long nonce) throws IOException {
     // As it stands, this is used for 3 things
-    //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
+    //  * batchMutate with single mutation - put/delete/increment/append, separate or from
+    //    checkAndMutate.
     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
     return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce));
@@ -4076,7 +4419,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
-    return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    // If the mutations has any Increment/Append operations, we need to do batchMutate atomically
+    boolean atomic = Arrays.stream(mutations)
+      .anyMatch(m -> m instanceof Increment || m instanceof Append);
+    return batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
   public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
@@ -4106,12 +4452,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Perform a batch of mutations.
    *
-   * It supports only Put and Delete mutations and will ignore other types passed. Operations in
-   * a batch are stored with highest durability specified of for all operations in a batch,
-   * except for {@link Durability#SKIP_WAL}.
+   * It supports Put, Delete, Increment, Append mutations and will ignore other types passed.
+   * Operations in a batch are stored with highest durability specified of for all operations in a
+   * batch, except for {@link Durability#SKIP_WAL}.
    *
    * <p>This function is called from {@link #batchReplay(WALSplitUtil.MutationReplay[], long)} with
-   * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[], long, long)} with
+   * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[])} with
    * {@link MutationBatchOperation} instance as an argument. As the processing of replay batch
    * and mutation batch is very similar, lot of code is shared by providing generic methods in
    * base class {@link BatchOperation}. The logic for this method and
@@ -4136,7 +4482,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!initialized) {
           this.writeRequestsCount.add(batchOp.size());
           // validate and prepare batch for write, for MutationBatchOperation it also calls CP
-          // prePut()/ preDelete() hooks
+          // prePut()/preDelete()/preIncrement()/preAppend() hooks
           batchOp.checkAndPrepare();
           initialized = true;
         }
@@ -4154,7 +4500,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
-   * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
+   * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[])}
    * In here we also handle replay of edits on region recover. Also gets change in size brought
    * about by applying {@code batchOp}.
    */
@@ -4175,16 +4521,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // We've now grabbed as many mutations off the list as we can
       // Ensure we acquire at least one.
       if (miniBatchOp.getReadyToWriteCount() <= 0) {
-        // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
+        // Nothing to put/delete/increment/append -- an exception in the above such as
+        // NoSuchColumnFamily?
         return;
       }
 
       lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
       locked = true;
 
-      // STEP 2. Update mini batch of all operations in progress with  LATEST_TIMESTAMP timestamp
+      // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp
       // We should record the timestamp only after we have acquired the rowLock,
-      // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
+      // otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer
+      // timestamp
       long now = EnvironmentEdgeManager.currentTime();
       batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
 
@@ -4230,11 +4578,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       final int finalLastIndexExclusive =
           miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
       final boolean finalSuccess = success;
-      batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> {
-        batchOp.retCodeDetails[i] =
-            finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
-        return true;
-      });
+      batchOp.visitBatchOperations(true, finalLastIndexExclusive,
+        (int i) -> {
+          Mutation mutation = batchOp.getMutation(i);
+          if (mutation instanceof Increment || mutation instanceof Append) {
+            if (finalSuccess) {
+              batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS,
+                batchOp.results[i]);
+            } else {
+              batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
+            }
+          } else {
+            batchOp.retCodeDetails[i] =
+              finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
+          }
+          return true;
+        });
 
       batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess);
 
@@ -4408,7 +4767,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
         }
 
-        // If matches put the new put or delete the new delete
+        // If matches, perform the mutation or the rowMutations
         if (matches) {
           // We have acquired the row lock already. If the system clock is NOT monotonically
           // non-decreasing (see HBASE-14070) we should make sure that the mutation has a
@@ -4433,13 +4792,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             // timestamp from get (see prepareDeleteTimestamps).
           }
           // All edits for the given row (across all column families) must happen atomically.
+          Result r = null;
           if (mutation != null) {
-            doBatchMutate(mutation);
+            r = doBatchMutate(mutation, true).getResult();
           } else {
             mutateRow(rowMutations);
           }
           this.checkAndMutateChecksPassed.increment();
-          return new CheckAndMutateResult(true, null);
+          return new CheckAndMutateResult(true, r);
         }
         this.checkAndMutateChecksFailed.increment();
         return new CheckAndMutateResult(false, null);
@@ -4453,9 +4813,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   private void checkMutationType(final Mutation mutation)
   throws DoNotRetryIOException {
-    boolean isPut = mutation instanceof Put;
-    if (!isPut && !(mutation instanceof Delete)) {
-      throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must be Put or Delete");
+    if (!(mutation instanceof Put) && !(mutation instanceof Delete) &&
+      !(mutation instanceof Increment) && !(mutation instanceof Append)) {
+      throw new org.apache.hadoop.hbase.DoNotRetryIOException(
+        "Action must be Put or Delete or Increment or Delete");
     }
   }
 
@@ -4493,17 +4854,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return matches;
   }
 
+  private OperationStatus doBatchMutate(Mutation mutation) throws IOException {
+    return doBatchMutate(mutation, false);
+  }
 
-  private void doBatchMutate(Mutation mutation) throws IOException {
-    // Currently this is only called for puts and deletes, so no nonces.
-    OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
+  private OperationStatus doBatchMutate(Mutation mutation, boolean atomic) throws IOException {
+    return doBatchMutate(mutation, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  private OperationStatus doBatchMutate(Mutation mutation, boolean atomic, long nonceGroup,
+    long nonce) throws IOException {
+    OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}, atomic,
+      nonceGroup, nonce);
     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
-    } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
+    } else if (batchMutate[0].getOperationStatusCode().equals(
+      OperationStatusCode.STORE_TOO_BUSY)) {
       throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
     }
+    return batchMutate[0];
   }
 
   /**
@@ -8021,8 +8392,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  public Result append(Append mutation, long nonceGroup, long nonce) throws IOException {
-    return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults());
+  public Result append(Append append, long nonceGroup, long nonce) throws IOException {
+    checkReadOnly();
+    checkResources();
+    startRegionOperation(Operation.APPEND);
+    try {
+      // All edits for the given row (across all column families) must happen atomically.
+      return doBatchMutate(append, true, nonceGroup, nonce).getResult();
+    } finally {
+      closeRegionOperation(Operation.APPEND);
+    }
   }
 
   @Override
@@ -8030,110 +8409,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  public Result increment(Increment mutation, long nonceGroup, long nonce) throws IOException {
-    return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults());
-  }
-
-  /**
-   * Add "deltas" to Cells. Deltas are increments or appends. Switch on <code>op</code>.
-   *
-   * <p>If increment, add deltas to current values or if an append, then
-   * append the deltas to the current Cell values.
-   *
-   * <p>Append and Increment code paths are mostly the same. They differ in just a few places.
-   * This method does the code path for increment and append and then in key spots, switches
-   * on the passed in <code>op</code> to do increment or append specific paths.
-   */
-  private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long nonce,
-      boolean returnResults) throws IOException {
+  public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException {
     checkReadOnly();
     checkResources();
-    checkRow(mutation.getRow(), op.toString());
-    checkFamilies(mutation.getFamilyCellMap().keySet());
-    this.writeRequestsCount.increment();
-    WriteEntry writeEntry = null;
-    startRegionOperation(op);
-    List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null;
-    RowLock rowLock = null;
-    MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
+    startRegionOperation(Operation.INCREMENT);
     try {
-      rowLock = getRowLockInternal(mutation.getRow(), false, null);
-      lock(this.updatesLock.readLock());
-      try {
-        Result cpResult = doCoprocessorPreCall(op, mutation);
-        if (cpResult != null) {
-          // Metrics updated below in the finally block.
-          return returnResults? cpResult: null;
-        }
-        Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());
-        Map<HStore, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
-        // Reckon Cells to apply to WAL --  in returned walEdit -- and what to add to memstore and
-        // what to return back to the client (in 'forMemStore' and 'results' respectively).
-        WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);
-        // Actually write to WAL now if a walEdit to apply.
-        if (walEdit != null && !walEdit.isEmpty()) {
-          writeEntry = doWALAppend(walEdit, effectiveDurability, nonceGroup, nonce);
-        } else {
-          // If walEdits is empty, it means we skipped the WAL; update LongAdders and start an mvcc
-          // transaction.
-          recordMutationWithoutWal(mutation.getFamilyCellMap());
-          writeEntry = mvcc.begin();
-          updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
-        }
-        // Now write to MemStore. Do it a column family at a time.
-        for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) {
-          applyToMemStore(e.getKey(), e.getValue(), true, memstoreAccounting);
-        }
-        mvcc.completeAndWait(writeEntry);
-        if (rsServices != null && rsServices.getNonceManager() != null) {
-          rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, nonce,
-            writeEntry.getWriteNumber());
-        }
-        if (rsServices != null && rsServices.getMetrics() != null) {
-          rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor.
-            getTableName());
-        }
-        writeEntry = null;
-      } finally {
-        this.updatesLock.readLock().unlock();
-      }
-      // If results is null, then client asked that we not return the calculated results.
-      return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT;
+      // All edits for the given row (across all column families) must happen atomically.
+      return doBatchMutate(increment, true, nonceGroup, nonce).getResult();
     } finally {
-      // Call complete always, even on success. doDelta is doing a Get READ_UNCOMMITTED when it goes
-      // to get current value under an exclusive lock so no need so no need to wait to return to
-      // the client. Means only way to read-your-own-increment or append is to come in with an
-      // a 0 increment.
-      if (writeEntry != null) mvcc.complete(writeEntry);
-      if (rowLock != null) {
-        rowLock.release();
-      }
-      // Request a cache flush if over the limit.  Do it outside update lock.
-      incMemStoreSize(memstoreAccounting.getMemStoreSize());
-      requestFlushIfNeeded();
-      closeRegionOperation(op);
-      if (this.metricsRegion != null) {
-        switch (op) {
-          case INCREMENT:
-            this.metricsRegion.updateIncrement();
-            break;
-          case APPEND:
-            this.metricsRegion.updateAppend();
-            break;
-          default:
-            break;
-        }
-      }
+      closeRegionOperation(Operation.INCREMENT);
     }
   }
 
-  private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, long nonceGroup,
-      long nonce)
-  throws IOException {
-    return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(),
-      nonceGroup, nonce);
-  }
-
   private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
       long now, long nonceGroup, long nonce) throws IOException {
     return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
@@ -8184,223 +8471,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return writeEntry;
   }
 
-  /**
-   * Do coprocessor pre-increment or pre-append call.
-   * @return Result returned out of the coprocessor, which means bypass all further processing and
-   *  return the proffered Result instead, or null which means proceed.
-   */
-  private Result doCoprocessorPreCall(final Operation op, final Mutation mutation)
-  throws IOException {
-    Result result = null;
-    if (this.coprocessorHost != null) {
-      switch(op) {
-        case INCREMENT:
-          result = this.coprocessorHost.preIncrementAfterRowLock((Increment)mutation);
-          break;
-        case APPEND:
-          result = this.coprocessorHost.preAppendAfterRowLock((Append)mutation);
-          break;
-        default: throw new UnsupportedOperationException(op.toString());
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Reckon the Cells to apply to WAL, memstore, and to return to the Client; these Sets are not
-   * always the same dependent on whether to write WAL.
-   *
-   * @param results Fill in here what goes back to the Client if it is non-null (if null, client
-   *  doesn't want results).
-   * @param forMemStore Fill in here what to apply to the MemStore (by Store).
-   * @return A WALEdit to apply to WAL or null if we are to skip the WAL.
-   */
-  private WALEdit reckonDeltas(Operation op, Mutation mutation, Durability effectiveDurability,
-      Map<HStore, List<Cell>> forMemStore, List<Cell> results) throws IOException {
-    WALEdit walEdit = null;
-    long now = EnvironmentEdgeManager.currentTime();
-    final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
-    // Process a Store/family at a time.
-    for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
-      final byte[] columnFamilyName = entry.getKey();
-      List<Cell> deltas = entry.getValue();
-      // Reckon for the Store what to apply to WAL and MemStore.
-      List<Cell> toApply = reckonDeltasByStore(stores.get(columnFamilyName), op, mutation,
-        effectiveDurability, now, deltas, results);
-      if (!toApply.isEmpty()) {
-        for (Cell cell : toApply) {
-          HStore store = getStore(cell);
-          if (store == null) {
-            checkFamily(CellUtil.cloneFamily(cell));
-          } else {
-            forMemStore.computeIfAbsent(store, key -> new ArrayList<>()).add(cell);
-          }
-        }
-        if (writeToWAL) {
-          if (walEdit == null) {
-            walEdit = new WALEdit();
-          }
-          walEdit.getCells().addAll(toApply);
-        }
-      }
-    }
-    return walEdit;
-  }
-
-  /**
-   * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed
-   * column family/Store.
-   *
-   * Does Get of current value and then adds passed in deltas for this Store returning the result.
-   *
-   * @param op Whether Increment or Append
-   * @param mutation The encompassing Mutation object
-   * @param deltas Changes to apply to this Store; either increment amount or data to append
-   * @param results In here we accumulate all the Cells we are to return to the client. If null,
-   *                client doesn't want results returned.
-   * @return Resulting Cells after <code>deltas</code> have been applied to current
-   *  values. Side effect is our filling out of the <code>results</code> List.
-   */
-  private List<Cell> reckonDeltasByStore(HStore store, Operation op, Mutation mutation,
-      Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)
-      throws IOException {
-    byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
-    List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
-    // Get previous values for all columns in this family.
-    TimeRange tr = null;
-    switch (op) {
-      case INCREMENT:
-        tr = ((Increment)mutation).getTimeRange();
-        break;
-      case APPEND:
-        tr = ((Append)mutation).getTimeRange();
-        break;
-      default:
-        break;
-    }
-    List<Cell> currentValues = get(mutation, store, deltas,null, tr);
-    // Iterate the input columns and update existing values if they were found, otherwise
-    // add new column initialized to the delta amount
-    int currentValuesIndex = 0;
-    for (int i = 0; i < deltas.size(); i++) {
-      Cell delta = deltas.get(i);
-      Cell currentValue = null;
-      if (currentValuesIndex < currentValues.size() &&
-          CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
-        currentValue = currentValues.get(currentValuesIndex);
-        if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
-          currentValuesIndex++;
-        }
-      }
 
-      // Switch on whether this an increment or an append building the new Cell to apply.
-      Cell newCell = null;
-      switch (op) {
-        case INCREMENT:
-          long deltaAmount = getLongValue(delta);
-          final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;
-          newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));
-          break;
-        case APPEND:
-          newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) ->
-            ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
-                    .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
-                    .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())
-                    .array()
-          );
-          break;
-        default: throw new UnsupportedOperationException(op.toString());
-      }
-      if (this.maxCellSize > 0) {
-        int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell);
-        if (newCellSize > this.maxCellSize) {
-          String msg = "Cell with size " + newCellSize + " exceeds limit of " + this.maxCellSize
-            + " bytes in region " + this;
-          LOG.debug(msg);
-          throw new DoNotRetryIOException(msg);
-        }
-      }
-      cellPairs.add(new Pair<>(currentValue, newCell));
-      // Add to results to get returned to the Client. If null, cilent does not want results.
-      if (results != null) {
-        results.add(newCell);
-      }
-    }
 
-    // Give coprocessors a chance to update the new cells before apply to WAL or memstore
-    if (coprocessorHost != null) {
-      // Here the operation must be increment or append.
-      cellPairs = op == Operation.INCREMENT ?
-          coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
-          coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
-    }
-    return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
-  }
-
-  private static Cell reckonDelta(final Cell delta, final Cell currentCell,
-                                  final byte[] columnFamily, final long now,
-                                  Mutation mutation, Function<Cell, byte[]> supplier) throws IOException {
-    // Forward any tags found on the delta.
-    List<Tag> tags = TagUtil.carryForwardTags(delta);
-    tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
-    if (currentCell != null) {
-      tags = TagUtil.carryForwardTags(tags, currentCell);
-      byte[] newValue = supplier.apply(currentCell);
-      return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
-              .setRow(mutation.getRow(), 0, mutation.getRow().length)
-              .setFamily(columnFamily, 0, columnFamily.length)
-              // copy the qualifier if the cell is located in shared memory.
-              .setQualifier(CellUtil.cloneQualifier(delta))
-              .setTimestamp(Math.max(currentCell.getTimestamp() + 1, now))
-              .setType(KeyValue.Type.Put.getCode())
-              .setValue(newValue, 0, newValue.length)
-              .setTags(TagUtil.fromList(tags))
-              .build();
-    } else {
-      PrivateCellUtil.updateLatestStamp(delta, now);
-      return CollectionUtils.isEmpty(tags) ? delta : PrivateCellUtil.createCell(delta, tags);
-    }
-  }
 
-  /**
-   * @return Get the long out of the passed in Cell
-   */
-  private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
-    int len = cell.getValueLength();
-    if (len != Bytes.SIZEOF_LONG) {
-      // throw DoNotRetryIOException instead of IllegalArgumentException
-      throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
-    }
-    return PrivateCellUtil.getValueAsLong(cell);
-  }
 
-  /**
-   * Do a specific Get on passed <code>columnFamily</code> and column qualifiers.
-   * @param mutation Mutation we are doing this Get for.
-   * @param store Which column family on row (TODO: Go all Gets in one go)
-   * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
-   * @return Return list of Cells found.
-   */
-  private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,
-      IsolationLevel isolation, TimeRange tr) throws IOException {
-    // Sort the cells so that they match the order that they appear in the Get results. Otherwise,
-    // we won't be able to find the existing values if the cells are not specified in order by the
-    // client since cells are in an array list.
-    // TODO: I don't get why we are sorting. St.Ack 20150107
-    sort(coordinates, store.getComparator());
-    Get get = new Get(mutation.getRow());
-    if (isolation != null) {
-      get.setIsolationLevel(isolation);
-    }
-    for (Cell cell: coordinates) {
-      get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell));
-    }
-    // Increments carry time range. If an Increment instance, put it on the Get.
-    if (tr != null) {
-      get.setTimeRange(tr.getMin(), tr.getMax());
-    }
-    return get(get, false);
-  }
 
   /**
    * @return Sorted list of <code>cells</code> using <code>comparator</code>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index 65d2f55..ae5b6ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -45,6 +45,8 @@ public class MiniBatchOperationInProgress<T> {
   private int cellCount = 0;
   private int numOfPuts = 0;
   private int numOfDeletes = 0;
+  private int numOfIncrements = 0;
+  private int numOfAppends = 0;
 
 
   public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
@@ -169,4 +171,20 @@ public class MiniBatchOperationInProgress<T> {
   public void incrementNumOfDeletes() {
     this.numOfDeletes += 1;
   }
+
+  public int getNumOfIncrements() {
+    return numOfIncrements;
+  }
+
+  public void incrementNumOfIncrements() {
+    this.numOfIncrements += 1;
+  }
+
+  public int getNumOfAppends() {
+    return numOfAppends;
+  }
+
+  public void incrementNumOfAppends() {
+    this.numOfAppends += 1;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
index 21027d3..6beb7c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -43,21 +44,29 @@ public class OperationStatus {
   public static final OperationStatus NOT_RUN = new OperationStatus(OperationStatusCode.NOT_RUN);
 
   private final OperationStatusCode code;
-
+  private final Result result;
   private final String exceptionMsg;
 
   public OperationStatus(OperationStatusCode code) {
-    this(code, "");
+    this(code, null, "");
+  }
+
+  public OperationStatus(OperationStatusCode code, Result result) {
+    this(code, result, "");
   }
 
   public OperationStatus(OperationStatusCode code, String exceptionMsg) {
-    this.code = code;
-    this.exceptionMsg = exceptionMsg;
+    this(code, null, exceptionMsg);
   }
 
   public OperationStatus(OperationStatusCode code, Exception e) {
+    this(code, null, (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage());
+  }
+
+  private OperationStatus(OperationStatusCode code, Result result, String exceptionMsg) {
     this.code = code;
-    this.exceptionMsg = (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage();
+    this.result = result;
+    this.exceptionMsg = exceptionMsg;
   }
 
   /**
@@ -68,6 +77,13 @@ public class OperationStatus {
   }
 
   /**
+   * @return result
+   */
+  public Result getResult() {
+    return result;
+  }
+
+  /**
    * @return ExceptionMessge
    */
   public String getExceptionMsg() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7fe7f19..eaa8ca0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.io.UncheckedIOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.BindException;
@@ -549,38 +548,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  /**
-   * Starts the nonce operation for a mutation, if needed.
-   * @param mutation Mutation.
-   * @param nonceGroup Nonce group from the request.
-   * @return whether to proceed this mutation.
-   */
-  private boolean startNonceOperation(final MutationProto mutation, long nonceGroup)
-      throws IOException {
-    if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
-    boolean canProceed = false;
-    try {
-      canProceed = regionServer.nonceManager.startOperation(
-        nonceGroup, mutation.getNonce(), regionServer);
-    } catch (InterruptedException ex) {
-      throw new InterruptedIOException("Nonce start operation interrupted");
-    }
-    return canProceed;
-  }
-
-  /**
-   * Ends nonce operation for a mutation, if needed.
-   * @param mutation Mutation.
-   * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
-   * @param success Whether the operation for this nonce has succeeded.
-   */
-  private void endNonceOperation(final MutationProto mutation,
-      long nonceGroup, boolean success) {
-    if (regionServer.nonceManager != null && mutation.hasNonce()) {
-      regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
-    }
-  }
-
   private boolean isClientCellBlockSupport(RpcCallContext context) {
     return context != null && context.isClientCellBlockSupported();
   }
@@ -618,7 +585,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   }
 
   private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
-    CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement)
+    CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
     throws IOException {
     int countOfCompleteMutation = 0;
     try {
@@ -691,35 +658,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     checkCellSizeLimit(region, append);
     spaceQuota.getPolicyEnforcement(region).check(append);
     quota.addMutation(append);
-    Result r = null;
-    if (region.getCoprocessorHost() != null) {
-      r = region.getCoprocessorHost().preAppend(append);
-    }
-    if (r == null) {
-      boolean canProceed = startNonceOperation(mutation, nonceGroup);
-      boolean success = false;
-      try {
-        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
-        if (canProceed) {
-          r = region.append(append, nonceGroup, nonce);
-        } else {
-          // convert duplicate append to get
-          List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce);
-          r = Result.create(results);
-        }
-        success = true;
-      } finally {
-        if (canProceed) {
-          endNonceOperation(mutation, nonceGroup, success);
-        }
-      }
-      if (region.getCoprocessorHost() != null) {
-        r = region.getCoprocessorHost().postAppend(append, r);
-      }
-    }
+    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
+    Result r = region.append(append, nonceGroup, nonce);
     if (regionServer.getMetrics() != null) {
-      regionServer.getMetrics().updateAppend(
-          region.getTableDescriptor().getTableName(),
+      regionServer.getMetrics().updateAppend(region.getTableDescriptor().getTableName(),
         EnvironmentEdgeManager.currentTime() - before);
     }
     return r == null ? Result.EMPTY_RESULT : r;
@@ -737,66 +679,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     checkCellSizeLimit(region, increment);
     spaceQuota.getPolicyEnforcement(region).check(increment);
     quota.addMutation(increment);
-    Result r = null;
-    if (region.getCoprocessorHost() != null) {
-      r = region.getCoprocessorHost().preIncrement(increment);
-    }
-    if (r == null) {
-      boolean canProceed = startNonceOperation(mutation, nonceGroup);
-      boolean success = false;
-      try {
-        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
-        if (canProceed) {
-          r = region.increment(increment, nonceGroup, nonce);
-        } else {
-          // convert duplicate increment to get
-          List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce);
-          r = Result.create(results);
-        }
-        success = true;
-      } finally {
-        if (canProceed) {
-          endNonceOperation(mutation, nonceGroup, success);
-        }
-      }
-      if (region.getCoprocessorHost() != null) {
-        r = region.getCoprocessorHost().postIncrement(increment, r);
-      }
-    }
+    long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
+    Result r = region.increment(increment, nonceGroup, nonce);
     final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
     if (metricsRegionServer != null) {
-      metricsRegionServer.updateIncrement(
-          region.getTableDescriptor().getTableName(),
-          EnvironmentEdgeManager.currentTime() - before);
+      metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(),
+        EnvironmentEdgeManager.currentTime() - before);
     }
     return r == null ? Result.EMPTY_RESULT : r;
   }
 
-  private static Get toGet(final Mutation mutation) throws IOException {
-    if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
-      throw new AssertionError("mutation must be a instance of Increment or Append");
-    }
-    Get get = new Get(mutation.getRow());
-    CellScanner cellScanner = mutation.cellScanner();
-    while (!cellScanner.advance()) {
-      Cell cell = cellScanner.current();
-      get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
-    }
-    if (mutation instanceof Increment) {
-      // Increment
-      Increment increment = (Increment) mutation;
-      get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
-    } else {
-      // Append
-      Append append = (Append) mutation;
-      get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
-    }
-    for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
-      get.setAttribute(entry.getKey(), entry.getValue());
-    }
-    return get;
-  }
-
   /**
    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
@@ -2847,23 +2739,35 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       try {
         if (regionAction.hasCondition()) {
           try {
-            CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
-              cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
-            regionActionResultBuilder.setProcessed(result.isSuccess());
             ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
               ClientProtos.ResultOrException.newBuilder();
-            for (int i = 0; i < regionAction.getActionCount(); i++) {
-              if (i == 0 && result.getResult() != null) {
-                resultOrExceptionOrBuilder.setIndex(i);
-                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
-                  .setResult(ProtobufUtil.toResult(result.getResult())).build());
-                continue;
+            if (regionAction.getActionCount() == 1) {
+              CheckAndMutateResult result = checkAndMutate(region, quota,
+                regionAction.getAction(0).getMutation(), cellScanner,
+                regionAction.getCondition(), spaceQuotaEnforcement);
+              regionActionResultBuilder.setProcessed(result.isSuccess());
+              resultOrExceptionOrBuilder.setIndex(0);
+              if (result.getResult() != null) {
+                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
               }
-              // To unify the response format with doNonAtomicRegionMutation and read through
-              // client's AsyncProcess we have to add an empty result instance per operation
-              resultOrExceptionOrBuilder.clear();
-              resultOrExceptionOrBuilder.setIndex(i);
               regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
+            } else {
+              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
+                cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
+              regionActionResultBuilder.setProcessed(result.isSuccess());
+              for (int i = 0; i < regionAction.getActionCount(); i++) {
+                if (i == 0 && result.getResult() != null) {
+                  resultOrExceptionOrBuilder.setIndex(i);
+                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
+                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
+                  continue;
+                }
+                // To unify the response format with doNonAtomicRegionMutation and read through
+                // client's AsyncProcess we have to add an empty result instance per operation
+                resultOrExceptionOrBuilder.clear();
+                resultOrExceptionOrBuilder.setIndex(i);
+                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
+              }
             }
           } catch (IOException e) {
             rpcServer.getMetrics().exception(e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index d03d19f..64e2f66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -289,7 +289,8 @@ public interface Region extends ConfigurationObserver {
   /**
    * Perform a batch of mutations.
    * <p>
-   * Note this supports only Put and Delete mutations and will ignore other types passed.
+   * Note this supports only Put, Delete, Increment and Append mutations and will ignore other
+   * types passed.
    * @param mutations the list of mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 39e79c1..6cac67a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -1506,8 +1506,11 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
           if (m instanceof Put) {
             checkForReservedTagPresence(user, m);
             opType = OpType.PUT;
-          } else {
+          } else if (m instanceof Delete) {
             opType = OpType.DELETE;
+          } else {
+            // If the operation type is not Put or Delete, do nothing
+            continue;
           }
           AuthResult authResult = null;
           if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index de81750..2944c40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -636,6 +636,7 @@ public class TestAsyncTable {
             successCount.incrementAndGet();
             successIndex.set(i);
           }
+          assertNull(x.getResult());
           latch.countDown();
         }));
     latch.await();
@@ -670,6 +671,7 @@ public class TestAsyncTable {
             successCount.incrementAndGet();
             successIndex.set(i);
           }
+          assertNull(x.getResult());
           deleteLatch.countDown();
         }));
     deleteLatch.await();
@@ -717,6 +719,7 @@ public class TestAsyncTable {
             successCount.incrementAndGet();
             successIndex.set(i);
           }
+          assertNull(x.getResult());
           mutateLatch.countDown();
         });
     });
@@ -743,18 +746,21 @@ public class TestAsyncTable {
       .ifNotExists(FAMILY, QUALIFIER)
       .build(put)).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
       .ifEquals(FAMILY, QUALIFIER, VALUE)
       .timeRange(TimeRange.at(ts + 10000))
       .build(put)).get();
     assertFalse(result.isSuccess());
+    assertNull(result.getResult());
 
     result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
       .ifEquals(FAMILY, QUALIFIER, VALUE)
       .timeRange(TimeRange.at(ts))
       .build(put)).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     RowMutations rm = new RowMutations(row).add((Mutation) put);
 
@@ -763,12 +769,14 @@ public class TestAsyncTable {
       .timeRange(TimeRange.at(ts + 10000))
       .build(rm)).get();
     assertFalse(result.isSuccess());
+    assertNull(result.getResult());
 
     result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
       .ifEquals(FAMILY, QUALIFIER, VALUE)
       .timeRange(TimeRange.at(ts))
       .build(rm)).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
 
@@ -777,12 +785,14 @@ public class TestAsyncTable {
       .timeRange(TimeRange.at(ts + 10000))
       .build(delete)).get();
     assertFalse(result.isSuccess());
+    assertNull(result.getResult());
 
     result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
       .ifEquals(FAMILY, QUALIFIER, VALUE)
       .timeRange(TimeRange.at(ts))
       .build(delete)).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
   }
 
   @Test
@@ -802,6 +812,7 @@ public class TestAsyncTable {
         CompareOperator.EQUAL, Bytes.toBytes("a")))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
     assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -812,6 +823,7 @@ public class TestAsyncTable {
         CompareOperator.EQUAL, Bytes.toBytes("b")))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
     assertFalse(result.isSuccess());
+    assertNull(result.getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
 
@@ -821,6 +833,7 @@ public class TestAsyncTable {
         CompareOperator.EQUAL, Bytes.toBytes("a")))
       .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
 
@@ -833,6 +846,7 @@ public class TestAsyncTable {
           .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
         .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
     assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -860,6 +874,7 @@ public class TestAsyncTable {
           Bytes.toBytes("b"))))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
     assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -873,6 +888,7 @@ public class TestAsyncTable {
           Bytes.toBytes("c"))))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
     assertFalse(result.isSuccess());
+    assertNull(result.getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
 
@@ -885,6 +901,7 @@ public class TestAsyncTable {
           Bytes.toBytes("b"))))
       .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
 
@@ -900,6 +917,7 @@ public class TestAsyncTable {
           .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
         .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
     assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -922,6 +940,7 @@ public class TestAsyncTable {
         new TimestampsFilter(Collections.singletonList(100L))))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
     assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@@ -934,6 +953,7 @@ public class TestAsyncTable {
         new TimestampsFilter(Collections.singletonList(101L))))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
     assertFalse(result.isSuccess());
+    assertNull(result.getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
   }
@@ -953,6 +973,7 @@ public class TestAsyncTable {
       .timeRange(TimeRange.between(0, 101))
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
     assertTrue(result.isSuccess());
+    assertNull(result.getResult());
 
     Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
     assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@@ -965,10 +986,125 @@ public class TestAsyncTable {
       .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
       .get();
     assertFalse(result.isSuccess());
+    assertNull(result.getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
   }
 
+  @Test
+  public void testCheckAndIncrement() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+    // CheckAndIncrement with correct value
+    CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
+    assertTrue(res.isSuccess());
+    assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndIncrement with wrong value
+    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+    // CheckAndIncrement with a filter and correct value
+    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("c"))))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
+    assertTrue(res.isSuccess());
+    assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndIncrement with a filter and correct value
+    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("b")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("d"))))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+  }
+
+  @Test
+  public void testCheckAndAppend() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+    // CheckAndAppend with correct value
+    CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
+    assertTrue(res.isSuccess());
+    assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndAppend with correct value
+    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+    // CheckAndAppend with a filter and correct value
+    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("c"))))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
+    assertTrue(res.isSuccess());
+    assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndAppend with a filter and wrong value
+    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("b")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("d"))))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+  }
+
   // Tests for batch version of checkAndMutate
 
   @Test
@@ -997,7 +1133,9 @@ public class TestAsyncTable {
       table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
     assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -1017,7 +1155,9 @@ public class TestAsyncTable {
     results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
 
@@ -1042,7 +1182,9 @@ public class TestAsyncTable {
     results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     result = table.get(new Get(row3)).get();
     assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
@@ -1079,7 +1221,9 @@ public class TestAsyncTable {
       table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
     assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -1099,7 +1243,9 @@ public class TestAsyncTable {
     results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
     assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -1121,7 +1267,9 @@ public class TestAsyncTable {
     results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
     assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -1166,7 +1314,9 @@ public class TestAsyncTable {
       table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
     assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -1194,7 +1344,9 @@ public class TestAsyncTable {
     results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
 
@@ -1227,7 +1379,9 @@ public class TestAsyncTable {
     results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     result = table.get(new Get(row)).get();
     assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
@@ -1273,7 +1427,9 @@ public class TestAsyncTable {
       table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
 
     assertTrue(results.get(0).isSuccess());
+    assertNull(results.get(0).getResult());
     assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
 
     Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
     assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -1283,6 +1439,80 @@ public class TestAsyncTable {
   }
 
   @Test
+  public void testCheckAndIncrementBatch() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
+
+    // CheckAndIncrement with correct value
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
+
+    // CheckAndIncrement with wrong value
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+      .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
+
+    List<CheckAndMutateResult> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+    assertTrue(results.get(0).isSuccess());
+    assertEquals(1, Bytes.toLong(results.get(0).getResult()
+      .getValue(FAMILY, Bytes.toBytes("B"))));
+    assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+    assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
+  }
+
+  @Test
+  public void testCheckAndAppendBatch() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
+
+    // CheckAndAppend with correct value
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+
+    // CheckAndAppend with wrong value
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+      .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+
+    List<CheckAndMutateResult> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+    assertTrue(results.get(0).isSuccess());
+    assertEquals("bb", Bytes.toString(results.get(0).getResult()
+      .getValue(FAMILY, Bytes.toBytes("B"))));
+    assertFalse(results.get(1).isSuccess());
+    assertNull(results.get(1).getResult());
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+  }
+
+  @Test
   public void testDisabled() throws InterruptedException, ExecutionException {
     ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
     try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index cb53f30..20ec40e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -344,13 +344,17 @@ public class TestAsyncTableBatch {
     byte[] row3 = Bytes.toBytes("row3");
     byte[] row4 = Bytes.toBytes("row4");
     byte[] row5 = Bytes.toBytes("row5");
+    byte[] row6 = Bytes.toBytes("row6");
+    byte[] row7 = Bytes.toBytes("row7");
 
     table.putAll(Arrays.asList(
       new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
       new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
       new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
       new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
-      new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
+      new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
+      new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
+      new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))).get();
 
     CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
       .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
@@ -363,17 +367,36 @@ public class TestAsyncTableBatch {
       .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
       .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
     Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
-
-    List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
+    CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
+      .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
+      .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
+    CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
+      .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
+      .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
+
+    List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
+      checkAndMutate3, checkAndMutate4);
     List<Object> results = table.batchAll(actions).get();
 
     assertTrue(((CheckAndMutateResult) results.get(0)).isSuccess());
+    assertNull(((CheckAndMutateResult) results.get(0)).getResult());
     assertEquals("b",
       Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
     assertTrue(((Result) results.get(2)).getExists());
     assertFalse(((CheckAndMutateResult) results.get(3)).isSuccess());
+    assertNull(((CheckAndMutateResult) results.get(3)).getResult());
     assertTrue(((Result) results.get(4)).isEmpty());
 
+    CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(5);
+    assertTrue(checkAndMutateResult.isSuccess());
+    assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
+      .getValue(FAMILY, Bytes.toBytes("F"))));
+
+    checkAndMutateResult = (CheckAndMutateResult) results.get(6);
+    assertTrue(checkAndMutateResult.isSuccess());
+    assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
+      .getValue(FAMILY, Bytes.toBytes("G"))));
+
     Result result = table.get(new Get(row1)).get();
     assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
 
@@ -386,5 +409,11 @@ public class TestAsyncTableBatch {
 
     result = table.get(new Get(row5)).get();
     assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+
+    result = table.get(new Get(row6)).get();
+    assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+    result = table.get(new Get(row7)).get();
+    assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
index 72524c4..a4b79c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
@@ -372,6 +372,7 @@ public class TestCheckAndMutate {
         .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
         .build(rm));
       assertTrue(res.isSuccess());
+      assertNull(res.getResult());
 
       // get row back and assert the values
       getOneRowAndAssertAllButCExist(table);
@@ -407,6 +408,7 @@ public class TestCheckAndMutate {
           Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
       assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -417,6 +419,7 @@ public class TestCheckAndMutate {
         CompareOperator.EQUAL, Bytes.toBytes("b")))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
       assertFalse(result.isSuccess());
+      assertNull(result.getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
 
@@ -426,6 +429,7 @@ public class TestCheckAndMutate {
           CompareOperator.EQUAL, Bytes.toBytes("a")))
         .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
 
@@ -438,6 +442,7 @@ public class TestCheckAndMutate {
             .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
           .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
       assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -463,6 +468,7 @@ public class TestCheckAndMutate {
             Bytes.toBytes("b"))))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
       assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -476,6 +482,7 @@ public class TestCheckAndMutate {
             Bytes.toBytes("c"))))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
       assertFalse(result.isSuccess());
+      assertNull(result.getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
 
@@ -488,6 +495,7 @@ public class TestCheckAndMutate {
               Bytes.toBytes("b"))))
         .build(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
 
@@ -503,6 +511,7 @@ public class TestCheckAndMutate {
             .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
           .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
       assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -525,6 +534,7 @@ public class TestCheckAndMutate {
           new TimestampsFilter(Collections.singletonList(100L))))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
       assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@@ -537,6 +547,7 @@ public class TestCheckAndMutate {
           new TimestampsFilter(Collections.singletonList(101L))))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
       assertFalse(result.isSuccess());
+      assertNull(result.getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
     }
@@ -555,6 +566,7 @@ public class TestCheckAndMutate {
         .timeRange(TimeRange.between(0, 101))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
       assertTrue(result.isSuccess());
+      assertNull(result.getResult());
 
       Result r = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
       assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@@ -566,6 +578,7 @@ public class TestCheckAndMutate {
         .timeRange(TimeRange.between(0, 100))
         .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
       assertFalse(result.isSuccess());
+      assertNull(result.getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
     }
@@ -577,6 +590,120 @@ public class TestCheckAndMutate {
       .build(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
   }
 
+  @Test
+  public void testCheckAndIncrement() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+      // CheckAndIncrement with correct value
+      CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
+      assertTrue(res.isSuccess());
+      assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // CheckAndIncrement with wrong value
+      res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+        .build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
+      assertFalse(res.isSuccess());
+      assertNull(res.getResult());
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+      // CheckAndIncrement with a filter and correct value
+      res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+            Bytes.toBytes("c"))))
+        .build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
+      assertTrue(res.isSuccess());
+      assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // CheckAndIncrement with a filter and correct value
+      res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("b")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+            Bytes.toBytes("d"))))
+        .build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
+      assertFalse(res.isSuccess());
+      assertNull(res.getResult());
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndAppend() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+      // CheckAndAppend with correct value
+      CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+      assertTrue(res.isSuccess());
+      assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // CheckAndAppend with correct value
+      res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+        .build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+      assertFalse(res.isSuccess());
+      assertNull(res.getResult());
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+      // CheckAndAppend with a filter and correct value
+      res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+            Bytes.toBytes("c"))))
+        .build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
+      assertTrue(res.isSuccess());
+      assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // CheckAndAppend with a filter and wrong value
+      res = table.checkAndMutate(CheckAndMutate.newBuilder(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("b")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+            Bytes.toBytes("d"))))
+        .build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
+      assertFalse(res.isSuccess());
+      assertNull(res.getResult());
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+    }
+  }
+
   // Tests for batch version of checkAndMutate
 
   @Test
@@ -601,7 +728,9 @@ public class TestCheckAndMutate {
         table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
       assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -621,7 +750,9 @@ public class TestCheckAndMutate {
       results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
 
@@ -646,7 +777,9 @@ public class TestCheckAndMutate {
       results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       result = table.get(new Get(ROWKEY3));
       assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
@@ -680,7 +813,9 @@ public class TestCheckAndMutate {
         table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
       assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -700,7 +835,9 @@ public class TestCheckAndMutate {
       results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A")));
       assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -722,7 +859,9 @@ public class TestCheckAndMutate {
       results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       result = table.get(new Get(ROWKEY3).addColumn(FAMILY, Bytes.toBytes("C")));
       assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -766,7 +905,9 @@ public class TestCheckAndMutate {
         table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
       assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -794,7 +935,9 @@ public class TestCheckAndMutate {
       results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
 
@@ -827,7 +970,9 @@ public class TestCheckAndMutate {
       results = table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       result = table.get(new Get(ROWKEY));
       assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
@@ -872,7 +1017,9 @@ public class TestCheckAndMutate {
         table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
 
       assertTrue(results.get(0).isSuccess());
+      assertNull(results.get(0).getResult());
       assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
 
       Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C")));
       assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -881,4 +1028,76 @@ public class TestCheckAndMutate {
       assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
     }
   }
+
+  @Test
+  public void testCheckAndIncrementBatch() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(Arrays.asList(
+        new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+          .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
+        new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L))));
+
+      // CheckAndIncrement with correct value
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+          .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+          .build(new Increment(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), 1));
+
+      // CheckAndIncrement with wrong value
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+          .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+          .build(new Increment(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
+
+      List<CheckAndMutateResult> results =
+        table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+      assertTrue(results.get(0).isSuccess());
+      assertEquals(1, Bytes.toLong(results.get(0).getResult()
+        .getValue(FAMILY, Bytes.toBytes("B"))));
+      assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndAppendBatch() throws Throwable {
+    try (Table table = createTable()) {
+      table.put(Arrays.asList(
+        new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+          .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+        new Put(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+
+      // CheckAndAppend with correct value
+      CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(new Append(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+
+      // CheckAndAppend with wrong value
+      CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(ROWKEY2)
+        .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+        .build(new Append(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+
+      List<CheckAndMutateResult> results =
+        table.checkAndMutate(Arrays.asList(checkAndMutate1, checkAndMutate2));
+
+      assertTrue(results.get(0).isSuccess());
+      assertEquals("bb", Bytes.toString(results.get(0).getResult()
+        .getValue(FAMILY, Bytes.toBytes("B"))));
+      assertFalse(results.get(1).isSuccess());
+      assertNull(results.get(1).getResult());
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      result = table.get(new Get(ROWKEY2).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index f14b0a0..3397b8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -492,13 +492,17 @@ public class TestFromClientSide3 {
       byte[] row3 = Bytes.toBytes("row3");
       byte[] row4 = Bytes.toBytes("row4");
       byte[] row5 = Bytes.toBytes("row5");
+      byte[] row6 = Bytes.toBytes("row6");
+      byte[] row7 = Bytes.toBytes("row7");
 
       table.put(Arrays.asList(
         new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
         new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
         new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
         new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
-        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
+        new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
+        new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
 
       CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
         .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
@@ -511,18 +515,37 @@ public class TestFromClientSide3 {
         .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
         .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
       Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
-
-      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
+      CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
+        .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
+        .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
+      CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
+        .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
+        .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
+
+      List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
+        checkAndMutate3, checkAndMutate4);
       Object[] results = new Object[actions.size()];
       table.batch(actions, results);
 
       assertTrue(((CheckAndMutateResult) results[0]).isSuccess());
+      assertNull(((CheckAndMutateResult) results[0]).getResult());
       assertEquals("b",
         Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
       assertTrue(((Result) results[2]).getExists());
       assertFalse(((CheckAndMutateResult) results[3]).isSuccess());
+      assertNull(((CheckAndMutateResult) results[3]).getResult());
       assertTrue(((Result) results[4]).isEmpty());
 
+      CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[5];
+      assertTrue(checkAndMutateResult.isSuccess());
+      assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
+        .getValue(FAMILY, Bytes.toBytes("F"))));
+
+      checkAndMutateResult = (CheckAndMutateResult) results[6];
+      assertTrue(checkAndMutateResult.isSuccess());
+      assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
+        .getValue(FAMILY, Bytes.toBytes("G"))));
+
       Result result = table.get(new Get(row1));
       assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
 
@@ -535,6 +558,12 @@ public class TestFromClientSide3 {
 
       result = table.get(new Get(row5));
       assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+
+      result = table.get(new Get(row6));
+      assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
+
+      result = table.get(new Get(row7));
+      assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index 65bcd14..0707a53 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -650,7 +650,7 @@ public class TestAtomicOperation {
     put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
     puts[0] = put;
 
-    region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    region.batchMutate(puts);
     MultithreadedTestUtil.TestContext ctx =
       new MultithreadedTestUtil.TestContext(conf);
     ctx.addThread(new PutThread(ctx, region));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 6dbdaf4..3c004b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -2327,8 +2327,8 @@ public class TestHRegion {
         new Put(wrongRow).addColumn(fam1, qual1, value1));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
-        + "match the original one <rowA>", e.getMessage());
+      assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
+        e.getMessage());
     }
 
     try {
@@ -2337,8 +2337,8 @@ public class TestHRegion {
         new Put(wrongRow).addColumn(fam1, qual1, value1));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
-        + "match the original one <rowA>", e.getMessage());
+      assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
+        e.getMessage());
     }
 
     try {
@@ -2350,8 +2350,8 @@ public class TestHRegion {
           .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
-        + "match the original one <rowA>", e.getMessage());
+      assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
+        e.getMessage());
     }
 
     try {
@@ -2363,8 +2363,8 @@ public class TestHRegion {
           .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
-        + "match the original one <rowA>", e.getMessage());
+      assertEquals("The row of the action <wrongRow> doesn't match the original one <rowA>",
+        e.getMessage());
     }
   }
 
@@ -2851,6 +2851,127 @@ public class TestHRegion {
     assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
   }
 
+  @Test
+  public void testCheckAndIncrement() throws Throwable {
+    final byte[] FAMILY = Bytes.toBytes("fam");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+    // CheckAndIncrement with correct value
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
+    assertTrue(res.isSuccess());
+    assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndIncrement with wrong value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)));
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+    // CheckAndIncrement with a filter and correct value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("c"))))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
+    assertTrue(res.isSuccess());
+    assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndIncrement with a filter and correct value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("b")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("d"))))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2)));
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+  }
+
+  @Test
+  public void testCheckAndAppend() throws Throwable {
+    final byte[] FAMILY = Bytes.toBytes("fam");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+    // CheckAndAppend with correct value
+    CheckAndMutateResult res =
+      region.checkAndMutate(CheckAndMutate.newBuilder(row)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+    assertTrue(res.isSuccess());
+    assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndAppend with wrong value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+    // CheckAndAppend with a filter and correct value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("c"))))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
+    assertTrue(res.isSuccess());
+    assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // CheckAndAppend with a filter and wrong value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("b")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+          Bytes.toBytes("d"))))
+      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb"))));
+    assertFalse(res.isSuccess());
+    assertNull(res.getResult());
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+  }
+
   // ////////////////////////////////////////////////////////////////////////////
   // Delete tests
   // ////////////////////////////////////////////////////////////////////////////