You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/08/10 09:57:53 UTC

[hbase] branch branch-2 updated: HBASE-24680 Refactor the checkAndMutate code on the server side (#2184)

This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 22bf9a3  HBASE-24680 Refactor the checkAndMutate code on the server side (#2184)
22bf9a3 is described below

commit 22bf9a38c97f73bc1507c7de86af032500ead1ac
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Mon Aug 10 18:57:17 2020 +0900

    HBASE-24680 Refactor the checkAndMutate code on the server side (#2184)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../apache/hadoop/hbase/client/CheckAndMutate.java |  11 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  71 +++
 .../regionserver/MetricsRegionServerSource.java    |   7 +
 .../MetricsRegionServerSourceImpl.java             |   7 +
 .../hadoop/hbase/coprocessor/RegionObserver.java   | 189 ++++++++
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 133 ++++--
 .../hbase/regionserver/MetricsRegionServer.java    |   4 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 428 +++++++----------
 .../apache/hadoop/hbase/regionserver/Region.java   |  44 ++
 .../hbase/regionserver/RegionCoprocessorHost.java  | 311 ++-----------
 .../hbase/coprocessor/SimpleRegionObserver.java    |  39 ++
 .../coprocessor/TestRegionObserverInterface.java   |  73 ++-
 .../hadoop/hbase/regionserver/TestHRegion.java     | 512 ++++++++++++++++++++-
 .../regionserver/TestMetricsRegionServer.java      |   3 +-
 14 files changed, 1213 insertions(+), 619 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
index 5fce3e2..26eb23d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -214,7 +214,7 @@ public final class CheckAndMutate extends Mutation {
     this.op = op;
     this.value = value;
     this.filter = null;
-    this.timeRange = timeRange;
+    this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
     this.action = action;
   }
 
@@ -225,7 +225,7 @@ public final class CheckAndMutate extends Mutation {
     this.op = null;
     this.value = null;
     this.filter = filter;
-    this.timeRange = timeRange;
+    this.timeRange = timeRange != null ? timeRange : TimeRange.allTime();
     this.action = action;
   }
 
@@ -265,6 +265,13 @@ public final class CheckAndMutate extends Mutation {
   }
 
   /**
+   * @return whether this has a filter or not
+   */
+  public boolean hasFilter() {
+    return filter != null;
+  }
+
+  /**
    * @return the time range to check
    */
   public TimeRange getTimeRange() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 09db446..c82243a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ExtendedCellBuilder;
 import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
 import org.apache.hadoop.hbase.client.ClientUtil;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -84,6 +86,7 @@ import org.apache.hadoop.hbase.client.RegionLoadStats;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.SlowLogParams;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
@@ -3469,4 +3472,72 @@ public final class ProtobufUtil {
     return clearSlowLogResponses.getIsCleaned();
   }
 
+  public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
+    MutationProto mutation, CellScanner cellScanner) throws IOException {
+    byte[] row = condition.getRow().toByteArray();
+    CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
+    Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
+    if (filter != null) {
+      builder.ifMatches(filter);
+    } else {
+      builder.ifMatches(condition.getFamily().toByteArray(),
+        condition.getQualifier().toByteArray(),
+        CompareOperator.valueOf(condition.getCompareType().name()),
+        ProtobufUtil.toComparator(condition.getComparator()).getValue());
+    }
+    TimeRange timeRange = condition.hasTimeRange() ?
+      ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
+    builder.timeRange(timeRange);
+
+    try {
+      MutationType type = mutation.getMutateType();
+      switch (type) {
+        case PUT:
+          return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
+        case DELETE:
+          return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
+        default:
+          throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+      }
+    } catch (IllegalArgumentException e) {
+      throw new DoNotRetryIOException(e.getMessage());
+    }
+  }
+
+  public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
+    List<Mutation> mutations) throws IOException {
+    assert mutations.size() > 0;
+    byte[] row = condition.getRow().toByteArray();
+    CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row);
+    Filter filter = condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
+    if (filter != null) {
+      builder.ifMatches(filter);
+    } else {
+      builder.ifMatches(condition.getFamily().toByteArray(),
+        condition.getQualifier().toByteArray(),
+        CompareOperator.valueOf(condition.getCompareType().name()),
+        ProtobufUtil.toComparator(condition.getComparator()).getValue());
+    }
+    TimeRange timeRange = condition.hasTimeRange() ?
+      ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
+    builder.timeRange(timeRange);
+
+    try {
+      if (mutations.size() == 1) {
+        Mutation m = mutations.get(0);
+        if (m instanceof Put) {
+          return builder.build((Put) m);
+        } else if (m instanceof Delete) {
+          return builder.build((Delete) m);
+        } else {
+          throw new DoNotRetryIOException("Unsupported mutate type: " + mutations.get(0)
+            .getClass().getSimpleName().toUpperCase());
+        }
+      } else {
+        return builder.build(new RowMutations(mutations.get(0).getRow()).add(mutations));
+      }
+    } catch (IllegalArgumentException e) {
+      throw new DoNotRetryIOException(e.getMessage());
+    }
+  }
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index 958495a..ce9319e 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -87,6 +87,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
   void updateCheckAndPut(long t);
 
   /**
+   * Update checkAndMutate histogram
+   * @param t time it took
+   */
+  void updateCheckAndMutate(long t);
+
+  /**
    * Update the Get time histogram .
    *
    * @param t time it took
@@ -393,6 +399,7 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
   String DELETE_KEY = "delete";
   String CHECK_AND_DELETE_KEY = "checkAndDelete";
   String CHECK_AND_PUT_KEY = "checkAndPut";
+  String CHECK_AND_MUTATE_KEY = "checkAndMutate";
   String DELETE_BATCH_KEY = "deleteBatch";
   String GET_SIZE_KEY = "getSize";
   String GET_KEY = "get";
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 4af8bec..e8d7f36 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -42,6 +42,7 @@ public class MetricsRegionServerSourceImpl
   private final MetricHistogram deleteBatchHisto;
   private final MetricHistogram checkAndDeleteHisto;
   private final MetricHistogram checkAndPutHisto;
+  private final MetricHistogram checkAndMutateHisto;
   private final MetricHistogram getHisto;
   private final MetricHistogram incrementHisto;
   private final MetricHistogram appendHisto;
@@ -112,6 +113,7 @@ public class MetricsRegionServerSourceImpl
     deleteBatchHisto = getMetricsRegistry().newTimeHistogram(DELETE_BATCH_KEY);
     checkAndDeleteHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_DELETE_KEY);
     checkAndPutHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_PUT_KEY);
+    checkAndMutateHisto = getMetricsRegistry().newTimeHistogram(CHECK_AND_MUTATE_KEY);
 
     getHisto = getMetricsRegistry().newTimeHistogram(GET_KEY);
     slowGet = getMetricsRegistry().newCounter(SLOW_GET_KEY, SLOW_GET_DESC, 0L);
@@ -615,6 +617,11 @@ public class MetricsRegionServerSourceImpl
   }
 
   @Override
+  public void updateCheckAndMutate(long t) {
+    checkAndMutateHisto.add(t);
+  }
+
+  @Override
   public void updatePutBatch(long t) {
     putBatchHisto.add(t);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 05071fc..0bc0631 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@@ -514,7 +517,11 @@ public interface RegionObserver {
    * @param put data to put if check succeeds
    * @param result the default value of the result
    * @return the return value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
       byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
       boolean result) throws IOException {
@@ -535,7 +542,11 @@ public interface RegionObserver {
    * @param put data to put if check succeeds
    * @param result the default value of the result
    * @return the return value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
     Filter filter, Put put, boolean result) throws IOException {
     return result;
@@ -562,7 +573,12 @@ public interface RegionObserver {
    * @param put data to put if check succeeds
    * @param result the default value of the result
    * @return the return value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+   *   instead.
    */
+  @Deprecated
   default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
       byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
       ByteArrayComparable comparator, Put put, boolean result) throws IOException {
@@ -587,7 +603,12 @@ public interface RegionObserver {
    * @param put data to put if check succeeds
    * @param result the default value of the result
    * @return the return value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+   *   instead.
    */
+  @Deprecated
   default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
     byte[] row, Filter filter, Put put, boolean result) throws IOException {
     return result;
@@ -607,7 +628,11 @@ public interface RegionObserver {
    * @param put data to put if check succeeds
    * @param result from the checkAndPut
    * @return the possibly transformed return value to return to client
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
       byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
       boolean result) throws IOException {
@@ -625,7 +650,11 @@ public interface RegionObserver {
    * @param put data to put if check succeeds
    * @param result from the checkAndPut
    * @return the possibly transformed return value to return to client
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
     Filter filter, Put put, boolean result) throws IOException {
     return result;
@@ -648,7 +677,11 @@ public interface RegionObserver {
    * @param delete delete to commit if check succeeds
    * @param result the default value of the result
    * @return the value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
       byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
       Delete delete, boolean result) throws IOException {
@@ -669,7 +702,11 @@ public interface RegionObserver {
    * @param delete delete to commit if check succeeds
    * @param result the default value of the result
    * @return the value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
     Filter filter, Delete delete, boolean result) throws IOException {
     return result;
@@ -696,7 +733,12 @@ public interface RegionObserver {
    * @param delete delete to commit if check succeeds
    * @param result the default value of the result
    * @return the value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+   *   instead.
    */
+  @Deprecated
   default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
       byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
       ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
@@ -721,7 +763,12 @@ public interface RegionObserver {
    * @param delete delete to commit if check succeeds
    * @param result the default value of the result
    * @return the value to return to client if bypassing default processing
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #preCheckAndMutateAfterRowLock(ObserverContext, CheckAndMutate,CheckAndMutateResult)}
+   *   instead.
    */
+  @Deprecated
   default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
     byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
     return result;
@@ -741,7 +788,11 @@ public interface RegionObserver {
    * @param delete delete to commit if check succeeds
    * @param result from the CheckAndDelete
    * @return the possibly transformed returned value to return to client
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
       byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
       Delete delete, boolean result) throws IOException {
@@ -759,13 +810,151 @@ public interface RegionObserver {
    * @param delete delete to commit if check succeeds
    * @param result from the CheckAndDelete
    * @return the possibly transformed returned value to return to client
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #postCheckAndMutate(ObserverContext, CheckAndMutate, CheckAndMutateResult)} instead.
    */
+  @Deprecated
   default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
     Filter filter, Delete delete, boolean result) throws IOException {
     return result;
   }
 
   /**
+   * Called before checkAndMutate
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions.
+   * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+   * <p>
+   * Note: Do not retain references to any Cells in actions beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param checkAndMutate the CheckAndMutate object
+   * @param result the default value of the result
+   * @return the return value to return to client if bypassing default processing
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  default CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+    CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+    if (checkAndMutate.getAction() instanceof Put) {
+      boolean success;
+      if (checkAndMutate.hasFilter()) {
+        success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
+          (Put) checkAndMutate.getAction(), result.isSuccess());
+      } else {
+        success = preCheckAndPut(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
+          checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
+          new BinaryComparator(checkAndMutate.getValue()), (Put) checkAndMutate.getAction(),
+          result.isSuccess());
+      }
+      return new CheckAndMutateResult(success, null);
+    } else if (checkAndMutate.getAction() instanceof Delete) {
+      boolean success;
+      if (checkAndMutate.hasFilter()) {
+        success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFilter(),
+          (Delete) checkAndMutate.getAction(), result.isSuccess());
+      } else {
+        success = preCheckAndDelete(c, checkAndMutate.getRow(), checkAndMutate.getFamily(),
+          checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(),
+          new BinaryComparator(checkAndMutate.getValue()), (Delete) checkAndMutate.getAction(),
+          result.isSuccess());
+      }
+      return new CheckAndMutateResult(success, null);
+    }
+    return result;
+  }
+
+  /**
+   * Called before checkAndDelete but after acquiring rowlock.
+   * <p>
+   * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+   * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+   * can lead to potential deadlock.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions.
+   * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+   * <p>
+   * Note: Do not retain references to any Cells in actions beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param checkAndMutate the CheckAndMutate object
+   * @param result the default value of the result
+   * @return the value to return to client if bypassing default processing
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  default CheckAndMutateResult preCheckAndMutateAfterRowLock(
+    ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
+    CheckAndMutateResult result) throws IOException {
+    if (checkAndMutate.getAction() instanceof Put) {
+      boolean success;
+      if (checkAndMutate.hasFilter()) {
+        success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
+          checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
+      } else {
+        success = preCheckAndPutAfterRowLock(c, checkAndMutate.getRow(),
+          checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+          checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+          (Put) checkAndMutate.getAction(), result.isSuccess());
+      }
+      return new CheckAndMutateResult(success, null);
+    } else if (checkAndMutate.getAction() instanceof Delete) {
+      boolean success;
+      if (checkAndMutate.hasFilter()) {
+        success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
+          checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
+      } else {
+        success = preCheckAndDeleteAfterRowLock(c, checkAndMutate.getRow(),
+          checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+          checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+          (Delete) checkAndMutate.getAction(), result.isSuccess());
+      }
+      return new CheckAndMutateResult(success, null);
+    }
+    return result;
+  }
+
+  /**
+   * Called after checkAndMutate
+   * <p>
+   * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param checkAndMutate the CheckAndMutate object
+   * @param result from the checkAndMutate
+   * @return the possibly transformed returned value to return to client
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  default CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+    CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+    if (checkAndMutate.getAction() instanceof Put) {
+      boolean success;
+      if (checkAndMutate.hasFilter()) {
+        success = postCheckAndPut(c, checkAndMutate.getRow(),
+          checkAndMutate.getFilter(), (Put) checkAndMutate.getAction(), result.isSuccess());
+      } else {
+        success = postCheckAndPut(c, checkAndMutate.getRow(),
+          checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+          checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+          (Put) checkAndMutate.getAction(), result.isSuccess());
+      }
+      return new CheckAndMutateResult(success, null);
+    } else if (checkAndMutate.getAction() instanceof Delete) {
+      boolean success;
+      if (checkAndMutate.hasFilter()) {
+        success = postCheckAndDelete(c, checkAndMutate.getRow(),
+          checkAndMutate.getFilter(), (Delete) checkAndMutate.getAction(), result.isSuccess());
+      } else {
+        success = postCheckAndDelete(c, checkAndMutate.getRow(),
+          checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
+          checkAndMutate.getCompareOp(), new BinaryComparator(checkAndMutate.getValue()),
+          (Delete) checkAndMutate.getAction(), result.isSuccess());
+      }
+      return new CheckAndMutateResult(success, null);
+    }
+    return result;
+  }
+
+  /**
    * Called before Append.
    * <p>
    * Call CoprocessorEnvironment#bypass to skip default actions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f62da4c..2cffa32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -103,6 +103,8 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Delete;
@@ -129,6 +131,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterWrapper;
@@ -4249,43 +4252,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   @Override
+  @Deprecated
   public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
     ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
-    return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
-      mutation);
+    CheckAndMutate checkAndMutate;
+    try {
+      CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row)
+        .ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange);
+      if (mutation instanceof Put) {
+        checkAndMutate = builder.build((Put) mutation);
+      } else if (mutation instanceof Delete) {
+        checkAndMutate = builder.build((Delete) mutation);
+      } else {
+        throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
+          .getSimpleName().toUpperCase());
+      }
+    } catch (IllegalArgumentException e) {
+      throw new DoNotRetryIOException(e.getMessage());
+    }
+    return checkAndMutate(checkAndMutate).isSuccess();
   }
 
   @Override
+  @Deprecated
   public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
     throws IOException {
-    return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
+    CheckAndMutate checkAndMutate;
+    try {
+      CheckAndMutate.Builder builder = CheckAndMutate.newBuilder(row).ifMatches(filter)
+        .timeRange(timeRange);
+      if (mutation instanceof Put) {
+        checkAndMutate = builder.build((Put) mutation);
+      } else if (mutation instanceof Delete) {
+        checkAndMutate = builder.build((Delete) mutation);
+      } else {
+        throw new DoNotRetryIOException("Unsupported mutate type: " + mutation.getClass()
+          .getSimpleName().toUpperCase());
+      }
+    } catch (IllegalArgumentException e) {
+      throw new DoNotRetryIOException(e.getMessage());
+    }
+    return checkAndMutate(checkAndMutate).isSuccess();
   }
 
   @Override
+  @Deprecated
   public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
     ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
-    return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
+    CheckAndMutate checkAndMutate;
+    try {
+      checkAndMutate = CheckAndMutate.newBuilder(row)
+        .ifMatches(family, qualifier, op, comparator.getValue()).timeRange(timeRange).build(rm);
+    } catch (IllegalArgumentException e) {
+      throw new DoNotRetryIOException(e.getMessage());
+    }
+    return checkAndMutate(checkAndMutate).isSuccess();
   }
 
   @Override
+  @Deprecated
   public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
     throws IOException {
-    return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
+    CheckAndMutate checkAndMutate;
+    try {
+      checkAndMutate = CheckAndMutate.newBuilder(row).ifMatches(filter).timeRange(timeRange)
+        .build(rm);
+    } catch (IllegalArgumentException e) {
+      throw new DoNotRetryIOException(e.getMessage());
+    }
+    return checkAndMutate(checkAndMutate).isSuccess();
   }
 
-  /**
-   * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
-   * switches in the few places where there is deviation.
-   */
-  private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
-    CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
-    RowMutations rowMutations, Mutation mutation)
-  throws IOException {
-    // Could do the below checks but seems wacky with two callers only. Just comment out for now.
-    // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
-    // need these commented out checks.
-    // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null");
-    // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set");
+  @Override
+  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+    byte[] row = checkAndMutate.getRow();
+    Filter filter = null;
+    byte[] family = null;
+    byte[] qualifier = null;
+    CompareOperator op = null;
+    ByteArrayComparable comparator = null;
+    if (checkAndMutate.hasFilter()) {
+      filter = checkAndMutate.getFilter();
+    } else {
+      family = checkAndMutate.getFamily();
+      qualifier = checkAndMutate.getQualifier();
+      op = checkAndMutate.getCompareOp();
+      comparator = new BinaryComparator(checkAndMutate.getValue());
+    }
+    TimeRange timeRange = checkAndMutate.getTimeRange();
+
+    Mutation mutation = null;
+    RowMutations rowMutations = null;
+    if (checkAndMutate.getAction() instanceof Mutation) {
+      mutation = (Mutation) checkAndMutate.getAction();
+    } else {
+      rowMutations = (RowMutations) checkAndMutate.getAction();
+    }
+
     if (mutation != null) {
       checkMutationType(mutation);
       checkRow(mutation, row);
@@ -4312,32 +4375,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       checkRow(row, "doCheckAndRowMutate");
       RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
       try {
-        if (mutation != null && this.getCoprocessorHost() != null) {
-          // Call coprocessor.
-          Boolean processed = null;
-          if (mutation instanceof Put) {
-            if (filter != null) {
-              processed = this.getCoprocessorHost()
-                .preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
-            } else {
-              processed = this.getCoprocessorHost()
-                .preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
-                  (Put) mutation);
-            }
-          } else if (mutation instanceof Delete) {
-            if (filter != null) {
-              processed = this.getCoprocessorHost()
-                .preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
-            } else {
-              processed = this.getCoprocessorHost()
-                .preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
-                  (Delete) mutation);
-            }
-          }
-          if (processed != null) {
-            return processed;
+        if (this.getCoprocessorHost() != null) {
+          CheckAndMutateResult result =
+            getCoprocessorHost().preCheckAndMutateAfterRowLock(checkAndMutate);
+          if (result != null) {
+            return result;
           }
         }
+
         // NOTE: We used to wait here until mvcc caught up:  mvcc.await();
         // Supposition is that now all changes are done under row locks, then when we go to read,
         // we'll get the latest on this row.
@@ -4395,10 +4440,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             mutateRow(rowMutations);
           }
           this.checkAndMutateChecksPassed.increment();
-          return true;
+          return new CheckAndMutateResult(true, null);
         }
         this.checkAndMutateChecksFailed.increment();
-        return false;
+        return new CheckAndMutateResult(false, null);
       } finally {
         rowLock.release();
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index a66ae00..afb249d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -153,6 +153,10 @@ public class MetricsRegionServer {
     serverSource.updateCheckAndPut(t);
   }
 
+  public void updateCheckAndMutate(long t) {
+    serverSource.updateCheckAndMutate(t);
+  }
+
   public void updateGet(TableName tn, long t) {
     if (tableMetrics != null && tn != null) {
       tableMetrics.updateGet(tn, t);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bc00ab2..7fe7f19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -67,6 +66,8 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -78,7 +79,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
@@ -87,10 +87,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -620,62 +617,55 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  /**
-   * Mutate a list of rows atomically.
-   * @param cellScanner if non-null, the mutation data -- the Cell content.
-   */
-  private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
-    final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
-    CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
-    RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
+  private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Action> actions,
+    CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement)
     throws IOException {
     int countOfCompleteMutation = 0;
     try {
       if (!region.getRegionInfo().isMetaRegion()) {
         regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
       }
-      RowMutations rm = null;
-      int i = 0;
-      ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
-        ClientProtos.ResultOrException.newBuilder();
+      List<Mutation> mutations = new ArrayList<>();
       for (ClientProtos.Action action: actions) {
         if (action.hasGet()) {
           throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
             action.getGet());
         }
         MutationType type = action.getMutation().getMutateType();
-        if (rm == null) {
-          rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
-        }
         switch (type) {
           case PUT:
             Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
             ++countOfCompleteMutation;
             checkCellSizeLimit(region, put);
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
-            rm.add(put);
+            mutations.add(put);
             break;
           case DELETE:
             Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
             ++countOfCompleteMutation;
             spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
-            rm.add(del);
+            mutations.add(del);
             break;
           default:
             throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
         }
-        // To unify the response format with doNonAtomicRegionMutation and read through client's
-        // AsyncProcess we have to add an empty result instance per operation
-        resultOrExceptionOrBuilder.clear();
-        resultOrExceptionOrBuilder.setIndex(i++);
-        builder.addResultOrException(
-          resultOrExceptionOrBuilder.build());
       }
 
-      if (filter != null) {
-        return region.checkAndRowMutate(row, filter, timeRange, rm);
+      if (mutations.size() == 0) {
+        return new CheckAndMutateResult(true, null);
       } else {
-        return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
+        CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutations);
+        CheckAndMutateResult result = null;
+        if (region.getCoprocessorHost() != null) {
+          result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
+        }
+        if (result == null) {
+          result = region.checkAndMutate(checkAndMutate);
+          if (region.getCoprocessorHost() != null) {
+            result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
+          }
+        }
+        return result;
       }
     } finally {
       // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
@@ -2804,26 +2794,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
 
       try {
-        Condition condition = request.getCondition();
-        byte[] row = condition.getRow().toByteArray();
-        byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
-        byte[] qualifier =
-          condition.hasQualifier() ? condition.getQualifier().toByteArray() : null;
-        CompareOperator op = condition.hasCompareType() ?
-          CompareOperator.valueOf(condition.getCompareType().name()) :
-          null;
-        ByteArrayComparable comparator = condition.hasComparator() ?
-          ProtobufUtil.toComparator(condition.getComparator()) : null;
-        Filter filter =
-          condition.hasFilter() ? ProtobufUtil.toFilter(condition.getFilter()) : null;
-        TimeRange timeRange = condition.hasTimeRange() ?
-          ProtobufUtil.toTimeRange(condition.getTimeRange()) :
-          TimeRange.allTime();
-        boolean processed =
-          checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
-            qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
-            spaceQuotaEnforcement);
-        responseBuilder.setProcessed(processed);
+        CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
+          cellScanner, request.getCondition(), spaceQuotaEnforcement);
+        responseBuilder.setProcessed(result.isSuccess());
+        ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
+          ClientProtos.ResultOrException.newBuilder();
+        for (int i = 0; i < regionAction.getActionCount(); i++) {
+          // To unify the response format with doNonAtomicRegionMutation and read through
+          // client's AsyncProcess we have to add an empty result instance per operation
+          resultOrExceptionOrBuilder.clear();
+          resultOrExceptionOrBuilder.setIndex(i);
+          regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
+        }
       } catch (IOException e) {
         rpcServer.getMetrics().exception(e);
         // As it's an atomic operation with a condition, we may expect it's a global failure.
@@ -2865,80 +2847,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       try {
         if (regionAction.hasCondition()) {
           try {
-            Condition condition = regionAction.getCondition();
-            byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
-            byte[] qualifier = condition.hasQualifier() ?
-              condition.getQualifier().toByteArray() : null;
-            CompareOperator op = condition.hasCompareType() ?
-              CompareOperator.valueOf(condition.getCompareType().name()) : null;
-            ByteArrayComparable comparator = condition.hasComparator() ?
-              ProtobufUtil.toComparator(condition.getComparator()) : null;
-            Filter filter = condition.hasFilter() ?
-              ProtobufUtil.toFilter(condition.getFilter()) : null;
-            TimeRange timeRange = condition.hasTimeRange() ?
-              ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
-
-            boolean processed;
-            if (regionAction.hasAtomic() && regionAction.getAtomic()) {
-              // RowMutations
-              processed =
-                checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
-                  qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
-                  spaceQuotaEnforcement);
-            } else {
-              if (regionAction.getActionList().isEmpty()) {
-                // If the region action list is empty, do nothing.
-                regionActionResultBuilder.setProcessed(true);
+            CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
+              cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
+            regionActionResultBuilder.setProcessed(result.isSuccess());
+            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
+              ClientProtos.ResultOrException.newBuilder();
+            for (int i = 0; i < regionAction.getActionCount(); i++) {
+              if (i == 0 && result.getResult() != null) {
+                resultOrExceptionOrBuilder.setIndex(i);
+                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
+                  .setResult(ProtobufUtil.toResult(result.getResult())).build());
                 continue;
               }
-              Action action = regionAction.getAction(0);
-              if (action.hasGet()) {
-                throw new DoNotRetryIOException("CheckAndMutate doesn't support GET="
-                  + action.getGet());
-              }
-              MutationProto mutation = action.getMutation();
-              switch (mutation.getMutateType()) {
-                case PUT:
-                  Put put = ProtobufUtil.toPut(mutation, cellScanner);
-                  checkCellSizeLimit(region, put);
-                  // Throws an exception when violated
-                  spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
-                  quota.addMutation(put);
-
-                  if (filter != null) {
-                    processed = region.checkAndMutate(row, filter, timeRange, put);
-                  } else {
-                    processed = region.checkAndMutate(row, family, qualifier, op, comparator,
-                      timeRange, put);
-                  }
-                  break;
-
-                case DELETE:
-                  Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
-                  checkCellSizeLimit(region, delete);
-                  spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
-                  quota.addMutation(delete);
-
-                  if (filter != null) {
-                    processed = region.checkAndMutate(row, filter, timeRange, delete);
-                  } else {
-                    processed = region.checkAndMutate(row, family, qualifier, op, comparator,
-                      timeRange, delete);
-                  }
-                  break;
-
-                default:
-                  throw new DoNotRetryIOException("CheckAndMutate doesn't support "
-                    + mutation.getMutateType());
-              }
-
               // To unify the response format with doNonAtomicRegionMutation and read through
               // client's AsyncProcess we have to add an empty result instance per operation
-              regionActionResultBuilder.addResultOrException(
-                ClientProtos.ResultOrException.newBuilder().setIndex(0).build());
+              resultOrExceptionOrBuilder.clear();
+              resultOrExceptionOrBuilder.setIndex(i);
+              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
             }
-            regionActionResultBuilder.setProcessed(processed);
           } catch (IOException e) {
             rpcServer.getMetrics().exception(e);
             // As it's an atomic operation with a condition, we may expect it's a global failure.
@@ -3039,10 +2965,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
     OperationQuota quota = null;
     RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
-    ActivePolicyEnforcement spaceQuotaEnforcement = null;
-    MutationType type = null;
-    HRegion region = null;
-    long before = EnvironmentEdgeManager.currentTime();
     // Clear scanner so we are not holding on to reference across call.
     if (controller != null) {
       controller.setCellScanner(null);
@@ -3051,143 +2973,58 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       checkOpen();
       requestCount.increment();
       rpcMutateRequestCount.increment();
-      region = getRegion(request.getRegion());
+      HRegion region = getRegion(request.getRegion());
       MutateResponse.Builder builder = MutateResponse.newBuilder();
       MutationProto mutation = request.getMutation();
       if (!region.getRegionInfo().isMetaRegion()) {
         regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
       }
       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
-      Result r = null;
-      Boolean processed = null;
-      type = mutation.getMutateType();
-
       quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
-      spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
-
-      switch (type) {
-        case APPEND:
-          // TODO: this doesn't actually check anything.
-          r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
-          break;
-        case INCREMENT:
-          // TODO: this doesn't actually check anything.
-          r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
-          break;
-        case PUT:
-          Put put = ProtobufUtil.toPut(mutation, cellScanner);
-          checkCellSizeLimit(region, put);
-          // Throws an exception when violated
-          spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
-          quota.addMutation(put);
-          if (request.hasCondition()) {
-            Condition condition = request.getCondition();
-            byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
-            byte[] qualifier = condition.hasQualifier() ?
-              condition.getQualifier().toByteArray() : null;
-            CompareOperator op = condition.hasCompareType() ?
-              CompareOperator.valueOf(condition.getCompareType().name()) : null;
-            ByteArrayComparable comparator = condition.hasComparator() ?
-              ProtobufUtil.toComparator(condition.getComparator()) : null;
-            Filter filter = condition.hasFilter() ?
-              ProtobufUtil.toFilter(condition.getFilter()) : null;
-            TimeRange timeRange = condition.hasTimeRange() ?
-              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
-              TimeRange.allTime();
-            if (region.getCoprocessorHost() != null) {
-              if (filter != null) {
-                processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
-              } else {
-                processed = region.getCoprocessorHost()
-                  .preCheckAndPut(row, family, qualifier, op, comparator, put);
-              }
-            }
-            if (processed == null) {
-              boolean result;
-              if (filter != null) {
-                result = region.checkAndMutate(row, filter, timeRange, put);
-              } else {
-                result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
-                  put);
-              }
-              if (region.getCoprocessorHost() != null) {
-                if (filter != null) {
-                  result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
-                } else {
-                  result = region.getCoprocessorHost()
-                    .postCheckAndPut(row, family, qualifier, op, comparator, put, result);
-                }
-              }
-              processed = result;
-            }
-          } else {
-            region.put(put);
+      ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager()
+        .getActiveEnforcements();
+
+      if (request.hasCondition()) {
+        CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner,
+          request.getCondition(), spaceQuotaEnforcement);
+        builder.setProcessed(result.isSuccess());
+        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
+        addResult(builder, result.getResult(), controller, clientCellBlockSupported);
+        if (clientCellBlockSupported) {
+          addSize(context, result.getResult(), null);
+        }
+      } else {
+        Result r = null;
+        Boolean processed = null;
+        MutationType type = mutation.getMutateType();
+        switch (type) {
+          case APPEND:
+            // TODO: this doesn't actually check anything.
+            r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
+            break;
+          case INCREMENT:
+            // TODO: this doesn't actually check anything.
+            r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
+            break;
+          case PUT:
+            put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
             processed = Boolean.TRUE;
-          }
-          break;
-        case DELETE:
-          Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
-          checkCellSizeLimit(region, delete);
-          spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
-          quota.addMutation(delete);
-          if (request.hasCondition()) {
-            Condition condition = request.getCondition();
-            byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
-            byte[] qualifier = condition.hasQualifier() ?
-              condition.getQualifier().toByteArray() : null;
-            CompareOperator op = condition.hasCompareType() ?
-              CompareOperator.valueOf(condition.getCompareType().name()) : null;
-            ByteArrayComparable comparator = condition.hasComparator() ?
-              ProtobufUtil.toComparator(condition.getComparator()) : null;
-            Filter filter = condition.hasFilter() ?
-              ProtobufUtil.toFilter(condition.getFilter()) : null;
-            TimeRange timeRange = condition.hasTimeRange() ?
-              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
-              TimeRange.allTime();
-            if (region.getCoprocessorHost() != null) {
-              if (filter != null) {
-                processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
-              } else {
-                processed = region.getCoprocessorHost()
-                  .preCheckAndDelete(row, family, qualifier, op, comparator, delete);
-              }
-            }
-            if (processed == null) {
-              boolean result;
-              if (filter != null) {
-                result = region.checkAndMutate(row, filter, timeRange, delete);
-              } else {
-                result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
-                  delete);
-              }
-              if (region.getCoprocessorHost() != null) {
-                if (filter != null) {
-                  result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
-                    result);
-                } else {
-                  result = region.getCoprocessorHost()
-                    .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
-                }
-              }
-              processed = result;
-            }
-          } else {
-            region.delete(delete);
+            break;
+          case DELETE:
+            delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
             processed = Boolean.TRUE;
-          }
-          break;
-        default:
-          throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
-      }
-      if (processed != null) {
-        builder.setProcessed(processed.booleanValue());
-      }
-      boolean clientCellBlockSupported = isClientCellBlockSupport(context);
-      addResult(builder, r, controller, clientCellBlockSupported);
-      if (clientCellBlockSupported) {
-        addSize(context, r, null);
+            break;
+          default:
+            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+        }
+        if (processed != null) {
+          builder.setProcessed(processed);
+        }
+        boolean clientCellBlockSupported = isClientCellBlockSupport(context);
+        addResult(builder, r, controller, clientCellBlockSupported);
+        if (clientCellBlockSupported) {
+          addSize(context, r, null);
+        }
       }
       return builder.build();
     } catch (IOException ie) {
@@ -3197,32 +3034,79 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       if (quota != null) {
         quota.close();
       }
-      // Update metrics
-      final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
-      if (metricsRegionServer != null && type != null) {
-        long after = EnvironmentEdgeManager.currentTime();
-        switch (type) {
-        case DELETE:
-          if (request.hasCondition()) {
-            metricsRegionServer.updateCheckAndDelete(after - before);
-          } else {
-            metricsRegionServer.updateDelete(
-                region == null ? null : region.getRegionInfo().getTable(), after - before);
-          }
-          break;
+    }
+  }
+
+  private void put(HRegion region, OperationQuota quota, MutationProto mutation,
+    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
+    long before = EnvironmentEdgeManager.currentTime();
+    Put put = ProtobufUtil.toPut(mutation, cellScanner);
+    checkCellSizeLimit(region, put);
+    spaceQuota.getPolicyEnforcement(region).check(put);
+    quota.addMutation(put);
+    region.put(put);
+
+    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
+    if (metricsRegionServer != null) {
+      long after = EnvironmentEdgeManager.currentTime();
+      metricsRegionServer.updatePut(region.getRegionInfo().getTable(), after - before);
+    }
+  }
+
+  private void delete(HRegion region, OperationQuota quota, MutationProto mutation,
+    CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
+    long before = EnvironmentEdgeManager.currentTime();
+    Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+    checkCellSizeLimit(region, delete);
+    spaceQuota.getPolicyEnforcement(region).check(delete);
+    quota.addMutation(delete);
+    region.delete(delete);
+
+    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
+    if (metricsRegionServer != null) {
+      long after = EnvironmentEdgeManager.currentTime();
+      metricsRegionServer.updateDelete(region.getRegionInfo().getTable(), after - before);
+    }
+  }
+
+  private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota,
+    MutationProto mutation, CellScanner cellScanner, Condition condition,
+    ActivePolicyEnforcement spaceQuota) throws IOException {
+    long before = EnvironmentEdgeManager.currentTime();
+    CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation,
+      cellScanner);
+    checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction());
+    spaceQuota.getPolicyEnforcement(region).check((Mutation) checkAndMutate.getAction());
+    quota.addMutation((Mutation) checkAndMutate.getAction());
+
+    CheckAndMutateResult result = null;
+    if (region.getCoprocessorHost() != null) {
+      result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
+    }
+    if (result == null) {
+      result = region.checkAndMutate(checkAndMutate);
+      if (region.getCoprocessorHost() != null) {
+        result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
+      }
+    }
+    MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
+    if (metricsRegionServer != null) {
+      long after = EnvironmentEdgeManager.currentTime();
+      metricsRegionServer.updateCheckAndMutate(after - before);
+
+      MutationType type = mutation.getMutateType();
+      switch (type) {
         case PUT:
-          if (request.hasCondition()) {
-            metricsRegionServer.updateCheckAndPut(after - before);
-          } else {
-            metricsRegionServer.updatePut(
-                region == null ? null : region.getRegionInfo().getTable(),after - before);
-          }
+          metricsRegionServer.updateCheckAndPut(after - before);
+          break;
+        case DELETE:
+          metricsRegionServer.updateCheckAndDelete(after - before);
           break;
         default:
           break;
-        }
       }
     }
+    return result;
   }
 
   // This is used to keep compatible with the old client implementation. Consider remove it if we
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 1790468..d03d19f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -308,7 +310,11 @@ public interface Region extends ConfigurationObserver {
    * @param comparator the expected value
    * @param mutation data to put if check succeeds
    * @return true if mutation was applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
     ByteArrayComparable comparator, Mutation mutation) throws IOException {
     return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation);
@@ -327,7 +333,11 @@ public interface Region extends ConfigurationObserver {
    * @param mutation data to put if check succeeds
    * @param timeRange time range to check
    * @return true if mutation was applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
       ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
 
@@ -338,7 +348,11 @@ public interface Region extends ConfigurationObserver {
    * @param filter the filter
    * @param mutation data to put if check succeeds
    * @return true if mutation was applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
     throws IOException {
     return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
@@ -352,7 +366,11 @@ public interface Region extends ConfigurationObserver {
    * @param mutation data to put if check succeeds
    * @param timeRange time range to check
    * @return true if mutation was applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
     throws IOException;
 
@@ -368,7 +386,11 @@ public interface Region extends ConfigurationObserver {
    * @param comparator the expected value
    * @param mutations data to put if check succeeds
    * @return true if mutations were applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
     ByteArrayComparable comparator, RowMutations mutations) throws IOException {
     return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(),
@@ -388,7 +410,11 @@ public interface Region extends ConfigurationObserver {
    * @param mutations data to put if check succeeds
    * @param timeRange time range to check
    * @return true if mutations were applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
       ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
       throws IOException;
@@ -401,7 +427,11 @@ public interface Region extends ConfigurationObserver {
    * @param filter the filter
    * @param mutations data to put if check succeeds
    * @return true if mutations were applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
     throws IOException {
     return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
@@ -416,11 +446,25 @@ public interface Region extends ConfigurationObserver {
    * @param mutations data to put if check succeeds
    * @param timeRange time range to check
    * @return true if mutations were applied, false otherwise
+   *
+   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
+   *   {@link #checkAndMutate(CheckAndMutate)}  instead.
    */
+  @Deprecated
   boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
     RowMutations mutations) throws IOException;
 
   /**
+   * Atomically checks if a row matches the conditions and if it does, it performs the actions.
+   * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
+   * time.
+   * @param checkAndMutate the CheckAndMutate object
+   * @return true if mutations were applied, false otherwise
+   * @throws IOException if an error occurred in this method
+   */
+  CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException;
+
+  /**
    * Deletes the specified cells/row.
    * @param delete
    * @throws IOException
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 93c314d..56664db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RawCellBuilder;
@@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.RawCellBuilderFactory;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SharedConnection;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -1051,322 +1050,70 @@ public class RegionCoprocessorHost
 
   /**
    * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param family column family
-   * @param qualifier column qualifier
-   * @param op the comparison operation
-   * @param comparator the comparator
-   * @param put data to put if check succeeds
+   * @param checkAndMutate the CheckAndMutate object
    * @return true or false to return to client if default processing should be bypassed, or null
-   * otherwise
-   */
-  public Boolean preCheckAndPut(final byte [] row, final byte [] family,
-      final byte [] qualifier, final CompareOperator op,
-      final ByteArrayComparable comparator, final Put put)
-      throws IOException {
-    boolean bypassable = true;
-    boolean defaultResult = false;
-    if (coprocEnvironments.isEmpty()) {
-      return null;
-    }
-    return execOperationWithResult(
-        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-            defaultResult,  bypassable) {
-          @Override
-          public Boolean call(RegionObserver observer) throws IOException {
-            return observer.preCheckAndPut(this, row, family, qualifier,
-                op, comparator, put, getResult());
-          }
-        });
-  }
-
-  /**
-   * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param filter filter
-   * @param put data to put if check succeeds
-   * @return true or false to return to client if default processing should be bypassed, or null
-   * otherwise
+   *   otherwise
+   * @throws IOException if an error occurred on the coprocessor
    */
-  public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
+  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
     throws IOException {
     boolean bypassable = true;
-    boolean defaultResult = false;
+    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
     if (coprocEnvironments.isEmpty()) {
       return null;
     }
     return execOperationWithResult(
-      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-        defaultResult, bypassable) {
+      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
+        regionObserverGetter, defaultResult, bypassable) {
         @Override
-        public Boolean call(RegionObserver observer) throws IOException {
-          return observer.preCheckAndPut(this, row, filter, put, getResult());
+        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
+          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
         }
       });
   }
 
   /**
    * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param family column family
-   * @param qualifier column qualifier
-   * @param op the comparison operation
-   * @param comparator the comparator
-   * @param put data to put if check succeeds
-   * @return true or false to return to client if default processing should be bypassed, or null
-   *   otherwise
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
-      justification="Null is legit")
-  public Boolean preCheckAndPutAfterRowLock(
-      final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
-      final ByteArrayComparable comparator, final Put put) throws IOException {
-    boolean bypassable = true;
-    boolean defaultResult = false;
-    if (coprocEnvironments.isEmpty()) {
-      return null;
-    }
-    return execOperationWithResult(
-        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-            defaultResult, bypassable) {
-          @Override
-          public Boolean call(RegionObserver observer) throws IOException {
-            return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
-                op, comparator, put, getResult());
-          }
-        });
-  }
-
-  /**
-   * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param filter filter
-   * @param put data to put if check succeeds
+   * @param checkAndMutate the CheckAndMutate object
    * @return true or false to return to client if default processing should be bypassed, or null
    *   otherwise
+   * @throws IOException if an error occurred on the coprocessor
    */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
-    justification="Null is legit")
-  public Boolean preCheckAndPutAfterRowLock(
-    final byte[] row, final Filter filter, final Put put) throws IOException {
+  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
+    throws IOException {
     boolean bypassable = true;
-    boolean defaultResult = false;
+    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
     if (coprocEnvironments.isEmpty()) {
       return null;
     }
     return execOperationWithResult(
-      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-        defaultResult, bypassable) {
+      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
+        regionObserverGetter, defaultResult, bypassable) {
         @Override
-        public Boolean call(RegionObserver observer) throws IOException {
-          return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
+        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
+          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
         }
       });
   }
 
   /**
-   * @param row row to check
-   * @param family column family
-   * @param qualifier column qualifier
-   * @param op the comparison operation
-   * @param comparator the comparator
-   * @param put data to put if check succeeds
-   * @throws IOException e
-   */
-  public boolean postCheckAndPut(final byte [] row, final byte [] family,
-      final byte [] qualifier, final CompareOperator op,
-      final ByteArrayComparable comparator, final Put put,
-      boolean result) throws IOException {
-    if (this.coprocEnvironments.isEmpty()) {
-      return result;
-    }
-    return execOperationWithResult(
-        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
-          @Override
-          public Boolean call(RegionObserver observer) throws IOException {
-            return observer.postCheckAndPut(this, row, family, qualifier,
-                op, comparator, put, getResult());
-          }
-        });
-  }
-
-  /**
-   * @param row row to check
-   * @param filter filter
-   * @param put data to put if check succeeds
-   * @throws IOException e
-   */
-  public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
-    boolean result) throws IOException {
-    if (this.coprocEnvironments.isEmpty()) {
-      return result;
-    }
-    return execOperationWithResult(
-      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
-        @Override
-        public Boolean call(RegionObserver observer) throws IOException {
-          return observer.postCheckAndPut(this, row, filter, put, getResult());
-        }
-      });
-  }
-
-  /**
-   * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param family column family
-   * @param qualifier column qualifier
-   * @param op the comparison operation
-   * @param comparator the comparator
-   * @param delete delete to commit if check succeeds
-   * @return true or false to return to client if default processing should be bypassed, or null
-   *   otherwise
-   */
-  public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
-      final byte [] qualifier, final CompareOperator op,
-      final ByteArrayComparable comparator, final Delete delete)
-      throws IOException {
-    boolean bypassable = true;
-    boolean defaultResult = false;
-    if (coprocEnvironments.isEmpty()) {
-      return null;
-    }
-    return execOperationWithResult(
-        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-            defaultResult, bypassable) {
-          @Override
-          public Boolean call(RegionObserver observer) throws IOException {
-            return observer.preCheckAndDelete(this, row, family,
-                qualifier, op, comparator, delete, getResult());
-          }
-        });
-  }
-
-  /**
-   * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param filter filter
-   * @param delete delete to commit if check succeeds
+   * @param checkAndMutate the CheckAndMutate object
+   * @param result the result returned by the checkAndMutate
    * @return true or false to return to client if default processing should be bypassed, or null
    *   otherwise
+   * @throws IOException if an error occurred on the coprocessor
    */
-  public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
-    throws IOException {
-    boolean bypassable = true;
-    boolean defaultResult = false;
-    if (coprocEnvironments.isEmpty()) {
-      return null;
-    }
-    return execOperationWithResult(
-      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-        defaultResult, bypassable) {
-        @Override
-        public Boolean call(RegionObserver observer) throws IOException {
-          return observer.preCheckAndDelete(this, row, filter, delete, getResult());
-        }
-      });
-  }
-
-  /**
-   * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param family column family
-   * @param qualifier column qualifier
-   * @param op the comparison operation
-   * @param comparator the comparator
-   * @param delete delete to commit if check succeeds
-   * @return true or false to return to client if default processing should be bypassed,
-   * or null otherwise
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
-      justification="Null is legit")
-  public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
-      final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
-      final Delete delete) throws IOException {
-    boolean bypassable = true;
-    boolean defaultResult = false;
-    if (coprocEnvironments.isEmpty()) {
-      return null;
-    }
-    return execOperationWithResult(
-        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-            defaultResult, bypassable) {
-          @Override
-          public Boolean call(RegionObserver observer) throws IOException {
-            return observer.preCheckAndDeleteAfterRowLock(this, row,
-                family, qualifier, op, comparator, delete, getResult());
-          }
-        });
-  }
-
-  /**
-   * Supports Coprocessor 'bypass'.
-   * @param row row to check
-   * @param filter filter
-   * @param delete delete to commit if check succeeds
-   * @return true or false to return to client if default processing should be bypassed,
-   * or null otherwise
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
-    justification="Null is legit")
-  public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
-    final Delete delete) throws IOException {
-    boolean bypassable = true;
-    boolean defaultResult = false;
-    if (coprocEnvironments.isEmpty()) {
-      return null;
-    }
-    return execOperationWithResult(
-      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
-        defaultResult, bypassable) {
-        @Override
-        public Boolean call(RegionObserver observer) throws IOException {
-          return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
-        }
-      });
-  }
-
-  /**
-   * @param row row to check
-   * @param family column family
-   * @param qualifier column qualifier
-   * @param op the comparison operation
-   * @param comparator the comparator
-   * @param delete delete to commit if check succeeds
-   * @throws IOException e
-   */
-  public boolean postCheckAndDelete(final byte [] row, final byte [] family,
-      final byte [] qualifier, final CompareOperator op,
-      final ByteArrayComparable comparator, final Delete delete,
-      boolean result) throws IOException {
-    if (this.coprocEnvironments.isEmpty()) {
-      return result;
-    }
-    return execOperationWithResult(
-        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
-          @Override
-          public Boolean call(RegionObserver observer) throws IOException {
-            return observer.postCheckAndDelete(this, row, family,
-                qualifier, op, comparator, delete, getResult());
-          }
-        });
-  }
-
-  /**
-   * @param row row to check
-   * @param filter filter
-   * @param delete delete to commit if check succeeds
-   * @throws IOException e
-   */
-  public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
-    boolean result) throws IOException {
+  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
+    CheckAndMutateResult result) throws IOException {
     if (this.coprocEnvironments.isEmpty()) {
       return result;
     }
     return execOperationWithResult(
-      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
+      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
+        regionObserverGetter, result) {
         @Override
-        public Boolean call(RegionObserver observer) throws IOException {
-          return observer.postCheckAndDelete(this, row, filter, delete, getResult());
+        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
+          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
         }
       });
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index ef62901..0727385 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -113,6 +115,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
   final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
   final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
+  final AtomicInteger ctPreCheckAndMutate = new AtomicInteger(0);
+  final AtomicInteger ctPreCheckAndMutateAfterRowLock = new AtomicInteger(0);
+  final AtomicInteger ctPostCheckAndMutate = new AtomicInteger(0);
   final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
   final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
   final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
@@ -584,6 +589,28 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public CheckAndMutateResult preCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+    CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+    ctPreCheckAndMutate.incrementAndGet();
+    return RegionObserver.super.preCheckAndMutate(c, checkAndMutate, result);
+  }
+
+  @Override
+  public CheckAndMutateResult preCheckAndMutateAfterRowLock(
+    ObserverContext<RegionCoprocessorEnvironment> c, CheckAndMutate checkAndMutate,
+    CheckAndMutateResult result) throws IOException {
+    ctPreCheckAndMutateAfterRowLock.incrementAndGet();
+    return RegionObserver.super.preCheckAndMutateAfterRowLock(c, checkAndMutate, result);
+  }
+
+  @Override
+  public CheckAndMutateResult postCheckAndMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+    CheckAndMutate checkAndMutate, CheckAndMutateResult result) throws IOException {
+    ctPostCheckAndMutate.incrementAndGet();
+    return RegionObserver.super.postCheckAndMutate(c, checkAndMutate, result);
+  }
+
+  @Override
   public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
       Append append) throws IOException {
     ctPreAppendAfterRowLock.incrementAndGet();
@@ -826,6 +853,18 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
     return ctPostCheckAndDeleteWithFilter.get();
   }
 
+  public int getPreCheckAndMutate() {
+    return ctPreCheckAndMutate.get();
+  }
+
+  public int getPreCheckAndMutateAfterRowLock() {
+    return ctPreCheckAndMutateAfterRowLock.get();
+  }
+
+  public int getPostCheckAndMutate() {
+    return ctPostCheckAndMutate.get();
+  }
+
   public boolean hadPreIncrement() {
     return ctPreIncrement.get() > 0;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 5b77027..a3502bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -267,15 +269,17 @@ public class TestRegionObserverInterface {
       verifyMethodResult(SimpleRegionObserver.class,
         new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
           "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
-          "getPostCheckAndPutWithFilter" },
-        tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
 
       table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
       verifyMethodResult(SimpleRegionObserver.class,
         new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
           "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
-          "getPostCheckAndPutWithFilter" },
-        tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
 
       table.checkAndMutate(Bytes.toBytes(0),
           new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
@@ -283,8 +287,9 @@ public class TestRegionObserverInterface {
       verifyMethodResult(SimpleRegionObserver.class,
         new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
           "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
-          "getPostCheckAndPutWithFilter" },
-        tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
+          "getPostCheckAndPutWithFilter", "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
     } finally {
       util.deleteTable(tableName);
     }
@@ -304,16 +309,18 @@ public class TestRegionObserverInterface {
         SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
           "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
           "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
-          "getPostCheckAndDeleteWithFilter" },
-        tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+          "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 0, 0, 0, 0, 0, 0, 0, 0, 0 });
 
       table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
       verifyMethodResult(
         SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
           "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
           "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
-          "getPostCheckAndDeleteWithFilter" },
-        tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+          "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 1, 1, 1, 0, 0, 0, 1, 1, 1 });
 
       table.checkAndMutate(Bytes.toBytes(0),
           new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
@@ -322,8 +329,50 @@ public class TestRegionObserverInterface {
         SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
           "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
           "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
-          "getPostCheckAndDeleteWithFilter" },
-        tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
+          "getPostCheckAndDeleteWithFilter", "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 1, 1, 1, 1, 1, 1, 2, 2, 2 });
+    } finally {
+      util.deleteTable(tableName);
+      table.close();
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithRowMutationsHooks() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." +
+      name.getMethodName());
+    Table table = util.createTable(tableName, new byte[][] { A, B, C });
+    try {
+      byte[] row = Bytes.toBytes(0);
+
+      Put p = new Put(row).addColumn(A, A, A);
+      table.put(p);
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 0, 0, 0 });
+
+      table.checkAndMutate(CheckAndMutate.newBuilder(row)
+        .ifEquals(A, A, A)
+        .build(new RowMutations(row)
+          .add((Mutation) new Put(row).addColumn(B, B, B))
+          .add((Mutation) new Delete(row))));
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 1, 1, 1 });
+
+      Object[] result = new Object[2];
+      table.batch(Arrays.asList(p, CheckAndMutate.newBuilder(row)
+        .ifEquals(A, A, A)
+        .build(new RowMutations(row)
+          .add((Mutation) new Put(row).addColumn(B, B, B))
+          .add((Mutation) new Delete(row)))), result);
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndMutate",
+          "getPreCheckAndMutateAfterRowLock", "getPostCheckAndMutate" },
+        tableName, new Integer[] { 2, 2, 2 });
     } finally {
       util.deleteTable(tableName);
       table.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 48b1c0a..803a33b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -99,6 +99,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
@@ -1775,6 +1777,7 @@ public class TestHRegion {
   // checkAndMutate tests
   // ////////////////////////////////////////////////////////////////////////////
   @Test
+  @Deprecated
   public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
@@ -1844,6 +1847,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndMutate_WithWrongValue() throws IOException {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
@@ -1893,6 +1897,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndMutate_WithCorrectValue() throws IOException {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
@@ -1941,6 +1946,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
@@ -2030,6 +2036,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndPut_ThatPutWasWritten() throws IOException {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
@@ -2071,6 +2078,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndPut_wrongRowInPut() throws IOException {
     this.region = initHRegion(tableName, method, CONF, COLUMNS);
     Put put = new Put(row2);
@@ -2085,6 +2093,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
     byte[] row1 = Bytes.toBytes("row1");
     byte[] fam1 = Bytes.toBytes("fam1");
@@ -2158,6 +2167,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndMutate_WithFilters() throws Throwable {
     final byte[] FAMILY = Bytes.toBytes("fam");
 
@@ -2232,6 +2242,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
     final byte[] FAMILY = Bytes.toBytes("fam");
 
@@ -2280,6 +2291,7 @@ public class TestHRegion {
   }
 
   @Test
+  @Deprecated
   public void testCheckAndMutate_wrongMutationType() throws Throwable {
     // Setting up region
     this.region = initHRegion(tableName, method, CONF, fam1);
@@ -2289,7 +2301,7 @@ public class TestHRegion {
         new Increment(row).addColumn(fam1, qual1, 1));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("Action must be Put or Delete", e.getMessage());
+      assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
     }
 
     try {
@@ -2298,11 +2310,12 @@ public class TestHRegion {
         new Increment(row).addColumn(fam1, qual1, 1));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("Action must be Put or Delete", e.getMessage());
+      assertEquals("Unsupported mutate type: INCREMENT", e.getMessage());
     }
   }
 
   @Test
+  @Deprecated
   public void testCheckAndMutate_wrongRow() throws Throwable {
     final byte[] wrongRow = Bytes.toBytes("wrongRow");
 
@@ -2314,7 +2327,8 @@ public class TestHRegion {
         new Put(wrongRow).addColumn(fam1, qual1, value1));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("Action's getRow must match", e.getMessage());
+      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+        + "match the original one <rowA>", e.getMessage());
     }
 
     try {
@@ -2323,7 +2337,8 @@ public class TestHRegion {
         new Put(wrongRow).addColumn(fam1, qual1, value1));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("Action's getRow must match", e.getMessage());
+      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+        + "match the original one <rowA>", e.getMessage());
     }
 
     try {
@@ -2335,7 +2350,8 @@ public class TestHRegion {
           .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("Action's getRow must match", e.getMessage());
+      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+        + "match the original one <rowA>", e.getMessage());
     }
 
     try {
@@ -2347,10 +2363,494 @@ public class TestHRegion {
           .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
       fail("should throw DoNotRetryIOException");
     } catch (DoNotRetryIOException e) {
-      assertEquals("Action's getRow must match", e.getMessage());
+      assertEquals("The row of the action (Put/Delete/RowMutations) <wrongRow> doesn't "
+        + "match the original one <rowA>", e.getMessage());
     }
   }
 
+  @Test
+  public void testCheckAndMutateWithEmptyRowValue() throws IOException {
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] fam1 = Bytes.toBytes("fam1");
+    byte[] qf1 = Bytes.toBytes("qualifier");
+    byte[] emptyVal = new byte[] {};
+    byte[] val1 = Bytes.toBytes("value1");
+    byte[] val2 = Bytes.toBytes("value2");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, fam1);
+    // Putting empty data in key
+    Put put = new Put(row1);
+    put.addColumn(fam1, qf1, emptyVal);
+
+    // checkAndPut with empty value
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
+    assertTrue(res.isSuccess());
+
+    // Putting data in key
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, val1);
+
+    // checkAndPut with correct value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
+    assertTrue(res.isSuccess());
+
+    // not empty anymore
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(put));
+    assertFalse(res.isSuccess());
+
+    Delete delete = new Delete(row1);
+    delete.addColumn(fam1, qf1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
+    assertFalse(res.isSuccess());
+
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, val2);
+    // checkAndPut with correct value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
+    assertTrue(res.isSuccess());
+
+    // checkAndDelete with correct value
+    delete = new Delete(row1);
+    delete.addColumn(fam1, qf1);
+    delete.addColumn(fam1, qf1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
+    assertTrue(res.isSuccess());
+
+    delete = new Delete(row1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
+    assertTrue(res.isSuccess());
+
+    // checkAndPut looking for a null value
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, val1);
+
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1).ifNotExists(fam1, qf1)
+      .build(put));
+    assertTrue(res.isSuccess());
+  }
+
+  @Test
+  public void testCheckAndMutateWithWrongValue() throws IOException {
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] fam1 = Bytes.toBytes("fam1");
+    byte[] qf1 = Bytes.toBytes("qualifier");
+    byte[] val1 = Bytes.toBytes("value1");
+    byte[] val2 = Bytes.toBytes("value2");
+    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
+    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, fam1);
+    // Putting data in key
+    Put put = new Put(row1);
+    put.addColumn(fam1, qf1, val1);
+    region.put(put);
+
+    // checkAndPut with wrong value
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
+    assertFalse(res.isSuccess());
+
+    // checkAndDelete with wrong value
+    Delete delete = new Delete(row1);
+    delete.addFamily(fam1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(put));
+    assertFalse(res.isSuccess());
+
+    // Putting data in key
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
+    region.put(put);
+
+    // checkAndPut with wrong value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(put));
+    assertFalse(res.isSuccess());
+
+    // checkAndDelete with wrong value
+    delete = new Delete(row1);
+    delete.addFamily(fam1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd2)).build(delete));
+    assertFalse(res.isSuccess());
+  }
+
+  @Test
+  public void testCheckAndMutateWithCorrectValue() throws IOException {
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] fam1 = Bytes.toBytes("fam1");
+    byte[] qf1 = Bytes.toBytes("qualifier");
+    byte[] val1 = Bytes.toBytes("value1");
+    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, fam1);
+    // Putting data in key
+    long now = System.currentTimeMillis();
+    Put put = new Put(row1);
+    put.addColumn(fam1, qf1, now, val1);
+    region.put(put);
+
+    // checkAndPut with correct value
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
+    assertTrue("First", res.isSuccess());
+
+    // checkAndDelete with correct value
+    Delete delete = new Delete(row1, now + 1);
+    delete.addColumn(fam1, qf1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
+    assertTrue("Delete", res.isSuccess());
+
+    // Putting data in key
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1));
+    region.put(put);
+
+    // checkAndPut with correct value
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+        .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(put));
+    assertTrue("Second put", res.isSuccess());
+
+    // checkAndDelete with correct value
+    delete = new Delete(row1, now + 3);
+    delete.addColumn(fam1, qf1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+        .ifMatches(fam1, qf1, CompareOperator.EQUAL, Bytes.toBytes(bd1)).build(delete));
+    assertTrue("Second delete", res.isSuccess());
+  }
+
+  @Test
+  public void testCheckAndMutateWithNonEqualCompareOp() throws IOException {
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] fam1 = Bytes.toBytes("fam1");
+    byte[] qf1 = Bytes.toBytes("qualifier");
+    byte[] val1 = Bytes.toBytes("value1");
+    byte[] val2 = Bytes.toBytes("value2");
+    byte[] val3 = Bytes.toBytes("value3");
+    byte[] val4 = Bytes.toBytes("value4");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, fam1);
+    // Putting val3 in key
+    Put put = new Put(row1);
+    put.addColumn(fam1, qf1, val3);
+    region.put(put);
+
+    // Test CompareOp.LESS: original = val3, compare with val3, fail
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.LESS, val3).build(put));
+    assertFalse(res.isSuccess());
+
+    // Test CompareOp.LESS: original = val3, compare with val4, fail
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.LESS, val4).build(put));
+    assertFalse(res.isSuccess());
+
+    // Test CompareOp.LESS: original = val3, compare with val2,
+    // succeed (now value = val2)
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, val2);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.LESS, val2).build(put));
+    assertTrue(res.isSuccess());
+
+    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val3).build(put));
+    assertFalse(res.isSuccess());
+
+    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
+    // succeed (value still = val2)
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val2).build(put));
+    assertTrue(res.isSuccess());
+
+    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
+    // succeed (now value = val3)
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, val3);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.LESS_OR_EQUAL, val1).build(put));
+    assertTrue(res.isSuccess());
+
+    // Test CompareOp.GREATER: original = val3, compare with val3, fail
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.GREATER, val3).build(put));
+    assertFalse(res.isSuccess());
+
+    // Test CompareOp.GREATER: original = val3, compare with val2, fail
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.GREATER, val2).build(put));
+    assertFalse(res.isSuccess());
+
+    // Test CompareOp.GREATER: original = val3, compare with val4,
+    // succeed (now value = val2)
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, val2);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.GREATER, val4).build(put));
+    assertTrue(res.isSuccess());
+
+    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val1).build(put));
+    assertFalse(res.isSuccess());
+
+    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
+    // succeed (value still = val2)
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val2).build(put));
+    assertTrue(res.isSuccess());
+
+    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.GREATER_OR_EQUAL, val3).build(put));
+    assertTrue(res.isSuccess());
+  }
+
+  @Test
+  public void testCheckAndPutThatPutWasWritten() throws IOException {
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] fam1 = Bytes.toBytes("fam1");
+    byte[] fam2 = Bytes.toBytes("fam2");
+    byte[] qf1 = Bytes.toBytes("qualifier");
+    byte[] val1 = Bytes.toBytes("value1");
+    byte[] val2 = Bytes.toBytes("value2");
+
+    byte[][] families = { fam1, fam2 };
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, families);
+    // Putting data in the key to check
+    Put put = new Put(row1);
+    put.addColumn(fam1, qf1, val1);
+    region.put(put);
+
+    // Creating put to add
+    long ts = System.currentTimeMillis();
+    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
+    put = new Put(row1);
+    put.add(kv);
+
+    // checkAndPut with wrong value
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(put));
+    assertTrue(res.isSuccess());
+
+    Get get = new Get(row1);
+    get.addColumn(fam2, qf1);
+    Cell[] actual = region.get(get).rawCells();
+
+    Cell[] expected = { kv };
+
+    assertEquals(expected.length, actual.length);
+    for (int i = 0; i < actual.length; i++) {
+      assertEquals(expected[i], actual[i]);
+    }
+  }
+
+  @Test
+  public void testCheckAndDeleteThatDeleteWasWritten() throws IOException {
+    byte[] row1 = Bytes.toBytes("row1");
+    byte[] fam1 = Bytes.toBytes("fam1");
+    byte[] fam2 = Bytes.toBytes("fam2");
+    byte[] qf1 = Bytes.toBytes("qualifier1");
+    byte[] qf2 = Bytes.toBytes("qualifier2");
+    byte[] qf3 = Bytes.toBytes("qualifier3");
+    byte[] val1 = Bytes.toBytes("value1");
+    byte[] val2 = Bytes.toBytes("value2");
+    byte[] val3 = Bytes.toBytes("value3");
+    byte[] emptyVal = new byte[] {};
+
+    byte[][] families = { fam1, fam2 };
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, families);
+    // Put content
+    Put put = new Put(row1);
+    put.addColumn(fam1, qf1, val1);
+    region.put(put);
+    Threads.sleep(2);
+
+    put = new Put(row1);
+    put.addColumn(fam1, qf1, val2);
+    put.addColumn(fam2, qf1, val3);
+    put.addColumn(fam2, qf2, val2);
+    put.addColumn(fam2, qf3, val1);
+    put.addColumn(fam1, qf3, val1);
+    region.put(put);
+
+    LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString());
+
+    // Multi-column delete
+    Delete delete = new Delete(row1);
+    delete.addColumn(fam1, qf1);
+    delete.addColumn(fam2, qf1);
+    delete.addColumn(fam1, qf3);
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val2).build(delete));
+    assertTrue(res.isSuccess());
+
+    Get get = new Get(row1);
+    get.addColumn(fam1, qf1);
+    get.addColumn(fam1, qf3);
+    get.addColumn(fam2, qf2);
+    Result r = region.get(get);
+    assertEquals(2, r.size());
+    assertArrayEquals(val1, r.getValue(fam1, qf1));
+    assertArrayEquals(val2, r.getValue(fam2, qf2));
+
+    // Family delete
+    delete = new Delete(row1);
+    delete.addFamily(fam2);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam2, qf1, CompareOperator.EQUAL, emptyVal).build(delete));
+    assertTrue(res.isSuccess());
+
+    get = new Get(row1);
+    r = region.get(get);
+    assertEquals(1, r.size());
+    assertArrayEquals(val1, r.getValue(fam1, qf1));
+
+    // Row delete
+    delete = new Delete(row1);
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row1)
+      .ifMatches(fam1, qf1, CompareOperator.EQUAL, val1).build(delete));
+    assertTrue(res.isSuccess());
+    get = new Get(row1);
+    r = region.get(get);
+    assertEquals(0, r.size());
+  }
+
+  @Test
+  public void testCheckAndMutateWithFilters() throws Throwable {
+    final byte[] FAMILY = Bytes.toBytes("fam");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+    // Put one row
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+    region.put(put);
+
+    // Put with success
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+    assertTrue(res.isSuccess());
+
+    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    // Put with failure
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("c"))))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+    assertFalse(res.isSuccess());
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
+
+    // Delete with success
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))));
+    assertTrue(res.isSuccess());
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
+
+    // Mutate with success
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))))
+      .build(new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
+    assertTrue(res.isSuccess());
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
+    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+  }
+
+  @Test
+  public void testCheckAndMutateWithFiltersAndTimeRange() throws Throwable {
+    final byte[] FAMILY = Bytes.toBytes("fam");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+    // Put with specifying the timestamp
+    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+    // Put with success
+    CheckAndMutateResult res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+        Bytes.toBytes("a")))
+      .timeRange(TimeRange.between(0, 101))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+    assertTrue(res.isSuccess());
+
+    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // Put with failure
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+        Bytes.toBytes("a")))
+      .timeRange(TimeRange.between(0, 100))
+      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
+    assertFalse(res.isSuccess());
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
+
+    // Mutate with success
+    res = region.checkAndMutate(CheckAndMutate.newBuilder(row)
+      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+        Bytes.toBytes("a")))
+      .timeRange(TimeRange.between(0, 101))
+      .build(new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))));
+    assertTrue(res.isSuccess());
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+  }
+
   // ////////////////////////////////////////////////////////////////////////////
   // Delete tests
   // ////////////////////////////////////////////////////////////////////////////
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
index 4bf71c8..d046b36 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java
@@ -151,6 +151,7 @@ public class TestMetricsRegionServer {
       rsm.updateDelete(null, 17);
       rsm.updateCheckAndDelete(17);
       rsm.updateCheckAndPut(17);
+      rsm.updateCheckAndMutate(17);
     }
 
     HELPER.assertCounter("appendNumOps", 24, serverSource);
@@ -162,7 +163,7 @@ public class TestMetricsRegionServer {
     HELPER.assertCounter("deleteNumOps", 17, serverSource);
     HELPER.assertCounter("checkAndDeleteNumOps", 17, serverSource);
     HELPER.assertCounter("checkAndPutNumOps", 17, serverSource);
-
+    HELPER.assertCounter("checkAndMutateNumOps", 17, serverSource);
 
     HELPER.assertCounter("slowAppendCount", 12, serverSource);
     HELPER.assertCounter("slowDeleteCount", 13, serverSource);