You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/03/04 06:08:41 UTC

[hbase] branch branch-2 updated: HBASE-23146 Support CheckAndMutate with multiple conditions (#1209)

This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new c3edceb  HBASE-23146 Support CheckAndMutate with multiple conditions (#1209)
c3edceb is described below

commit c3edceb6aef1c5a0bae6eeafeb74900895963a88
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Wed Mar 4 15:08:31 2020 +0900

    HBASE-23146 Support CheckAndMutate with multiple conditions (#1209)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../org/apache/hadoop/hbase/client/AsyncTable.java |  55 ++++++
 .../apache/hadoop/hbase/client/AsyncTableImpl.java |  31 ++++
 .../org/apache/hadoop/hbase/client/HTable.java     | 123 ++++++++++---
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  87 +++++++--
 .../java/org/apache/hadoop/hbase/client/Table.java |  48 +++++
 .../hbase/shaded/protobuf/RequestConverter.java    |  89 ++++-----
 .../src/main/protobuf/Client.proto                 |   9 +-
 hbase-protocol/src/main/protobuf/Client.proto      |   9 +-
 .../hadoop/hbase/rest/client/RemoteHTable.java     |  34 ++--
 .../hadoop/hbase/coprocessor/RegionObserver.java   | 137 +++++++++++++-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  79 ++++++--
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 111 +++++++----
 .../apache/hadoop/hbase/regionserver/Region.java   |  53 ++++++
 .../hbase/regionserver/RegionCoprocessorHost.java  | 151 ++++++++++++++-
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 203 +++++++++++++++++++++
 .../hadoop/hbase/client/TestCheckAndMutate.java    | 190 +++++++++++++++++++
 .../hbase/client/TestMalformedCellFromClient.java  |   5 +-
 .../hbase/coprocessor/SimpleRegionObserver.java    |  97 ++++++++--
 .../coprocessor/TestRegionObserverInterface.java   |  51 +++++-
 .../hadoop/hbase/regionserver/RegionAsTable.java   |   6 +
 .../hadoop/hbase/regionserver/TestHRegion.java     | 123 +++++++++++++
 .../hadoop/hbase/thrift2/client/ThriftTable.java   |   6 +
 22 files changed, 1493 insertions(+), 204 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index bfcc187..e10f1f82 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -290,6 +291,60 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
   }
 
   /**
+   * Atomically checks if a row matches the specified filter. If it does, it adds the
+   * Put/Delete/RowMutations.
+   * <p>
+   * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
+   * execute it. This is a fluent style API, the code is like:
+   *
+   * <pre>
+   * <code>
+   * table.checkAndMutate(row, filter).thenPut(put)
+   *     .thenAccept(succ -> {
+   *       if (succ) {
+   *         System.out.println("Check and put succeeded");
+   *       } else {
+   *         System.out.println("Check and put failed");
+   *       }
+   *     });
+   * </code>
+   * </pre>
+   */
+  CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
+
+  /**
+   * A helper class for sending checkAndMutate request with a filter.
+   */
+  interface CheckAndMutateWithFilterBuilder {
+
+    /**
+     * @param timeRange time range to check.
+     */
+    CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
+
+    /**
+     * @param put data to put if check succeeds
+     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
+     *         will be wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenPut(Put put);
+
+    /**
+     * @param delete data to delete if check succeeds
+     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
+     *         value will be wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenDelete(Delete delete);
+
+    /**
+     * @param mutation mutations to perform if check succeeds
+     * @return true if the new mutation was executed, false otherwise. The return value will be
+     *         wrapped by a {@link CompletableFuture}.
+     */
+    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
+  }
+
+  /**
    * Performs multiple mutations atomically on a single row. Currently {@link Put} and
    * {@link Delete} are supported.
    * @param mutation object that specifies the set of mutations to perform atomically
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 2256a4c..d6406b6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -174,6 +175,36 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
   }
 
   @Override
+  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+    return new CheckAndMutateWithFilterBuilder() {
+
+      private final CheckAndMutateWithFilterBuilder builder =
+        rawTable.checkAndMutate(row, filter);
+
+      @Override
+      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
+        builder.timeRange(timeRange);
+        return this;
+      }
+
+      @Override
+      public CompletableFuture<Boolean> thenPut(Put put) {
+        return wrap(builder.thenPut(put));
+      }
+
+      @Override
+      public CompletableFuture<Boolean> thenDelete(Delete delete) {
+        return wrap(builder.thenDelete(delete));
+      }
+
+      @Override
+      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
+        return wrap(builder.thenMutate(mutation));
+      }
+    };
+  }
+
+  @Override
   public CompletableFuture<Void> mutateRow(RowMutations mutation) {
     return wrap(rawTable.mutateRow(mutation));
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 54ab606..9b30de6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -39,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -676,14 +675,16 @@ public class HTable implements Table {
   @Deprecated
   public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
       final byte [] value, final Put put) throws IOException {
-    return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
+    return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
+      put);
   }
 
   @Override
   @Deprecated
   public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
       final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
-    return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
+    return doCheckAndPut(row, family, qualifier, toCompareOperator(compareOp), value, null,
+      null, put);
   }
 
   @Override
@@ -692,21 +693,20 @@ public class HTable implements Table {
       final CompareOperator op, final byte [] value, final Put put) throws IOException {
     // The name of the operators in CompareOperator are intentionally those of the
     // operators in the filter's CompareOp enum.
-    return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
+    return doCheckAndPut(row, family, qualifier, op, value, null, null, put);
   }
 
   private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
-    final String opName, final byte[] value, final TimeRange timeRange, final Put put)
-    throws IOException {
+    final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
+    final Put put) throws IOException {
     ClientServiceCallable<Boolean> callable =
         new ClientServiceCallable<Boolean>(this.connection, getName(), row,
             this.rpcControllerFactory.newController(), put.getPriority()) {
       @Override
       protected Boolean rpcCall() throws Exception {
-        CompareType compareType = CompareType.valueOf(opName);
         MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-            new BinaryComparator(value), compareType, timeRange, put);
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
+          filter, timeRange, put);
         MutateResponse response = doMutate(request);
         return Boolean.valueOf(response.getProcessed());
       }
@@ -719,37 +719,37 @@ public class HTable implements Table {
   @Deprecated
   public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
     final byte[] value, final Delete delete) throws IOException {
-    return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
-      delete);
+    return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, null,
+      null, delete);
   }
 
   @Override
   @Deprecated
   public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
     final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
-    return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
+    return doCheckAndDelete(row, family, qualifier, toCompareOperator(compareOp), value, null,
+      null, delete);
   }
 
   @Override
   @Deprecated
   public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
     final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
-    return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
+    return doCheckAndDelete(row, family, qualifier, op, value, null, null, delete);
   }
 
   private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
-    final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
-    throws IOException {
+    final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
+    final Delete delete) throws IOException {
     CancellableRegionServerCallable<SingleResponse> callable =
       new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
         this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
         new RetryingTimeTracker().start(), delete.getPriority()) {
         @Override
         protected SingleResponse rpcCall() throws Exception {
-          CompareType compareType = CompareType.valueOf(opName);
           MutateRequest request = RequestConverter
             .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
-              qualifier, new BinaryComparator(value), compareType, timeRange, delete);
+              qualifier, op, value, filter, timeRange, delete);
           MutateResponse response = doMutate(request);
           return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
         }
@@ -776,8 +776,14 @@ public class HTable implements Table {
     return new CheckAndMutateBuilderImpl(row, family);
   }
 
+  @Override
+  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
+  }
+
   private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
-    final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
+    final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
+    final RowMutations rm)
     throws IOException {
     CancellableRegionServerCallable<MultiResponse> callable =
     new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
@@ -785,10 +791,9 @@ public class HTable implements Table {
         rm.getMaxPriority()) {
       @Override
       protected MultiResponse rpcCall() throws Exception {
-        CompareType compareType = CompareType.valueOf(opName);
         MultiRequest request = RequestConverter
-          .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-            new BinaryComparator(value), compareType, timeRange, rm);
+          .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
+            qualifier, op, value, filter, timeRange, rm);
         ClientProtos.MultiResponse response = doMulti(request);
         ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
         if (res.hasException()) {
@@ -833,14 +838,43 @@ public class HTable implements Table {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
   throws IOException {
-    return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
+    return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
+      null, rm);
   }
 
   @Override
   @Deprecated
   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
       final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
-    return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
+    return doCheckAndMutate(row, family, qualifier, op, value, null, null, rm);
+  }
+
+  private CompareOperator toCompareOperator(CompareOp compareOp) {
+    switch (compareOp) {
+      case LESS:
+        return CompareOperator.LESS;
+
+      case LESS_OR_EQUAL:
+        return CompareOperator.LESS_OR_EQUAL;
+
+      case EQUAL:
+        return CompareOperator.EQUAL;
+
+      case NOT_EQUAL:
+        return CompareOperator.NOT_EQUAL;
+
+      case GREATER_OR_EQUAL:
+        return CompareOperator.GREATER_OR_EQUAL;
+
+      case GREATER:
+        return CompareOperator.GREATER;
+
+      case NO_OP:
+        return CompareOperator.NO_OP;
+
+      default:
+        throw new AssertionError();
+    }
   }
 
   @Override
@@ -1247,19 +1281,54 @@ public class HTable implements Table {
     public boolean thenPut(Put put) throws IOException {
       validatePut(put);
       preCheck();
-      return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
+      return doCheckAndPut(row, family, qualifier, op, value, null, timeRange, put);
     }
 
     @Override
     public boolean thenDelete(Delete delete) throws IOException {
       preCheck();
-      return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
+      return doCheckAndDelete(row, family, qualifier, op, value, null, timeRange, delete);
     }
 
     @Override
     public boolean thenMutate(RowMutations mutation) throws IOException {
       preCheck();
-      return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
+      return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange,
+        mutation);
+    }
+  }
+
+  private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
+
+    private final byte[] row;
+    private final Filter filter;
+    private TimeRange timeRange;
+
+    CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
+      this.row = Preconditions.checkNotNull(row, "row is null");
+      this.filter = Preconditions.checkNotNull(filter, "filter is null");
+    }
+
+    @Override
+    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
+      this.timeRange = timeRange;
+      return this;
+    }
+
+    @Override
+    public boolean thenPut(Put put) throws IOException {
+      validatePut(put);
+      return doCheckAndPut(row, null, null, null, null, filter, timeRange, put);
+    }
+
+    @Override
+    public boolean thenDelete(Delete delete) throws IOException {
+      return doCheckAndDelete(row, null, null, null, null, filter, timeRange, delete);
+    }
+
+    @Override
+    public boolean thenMutate(RowMutations mutation) throws IOException {
+      return doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation);
     }
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 8050137..b4a6cc4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRespo
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
 
 /**
  * The implementation of RawAsyncTable.
@@ -358,10 +357,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
       validatePut(put, conn.connConf.getMaxKeyValueSize());
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
+        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
           stub, put,
-          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
+          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+            null, timeRange, p),
           (c, r) -> r.getProcessed()))
         .call();
     }
@@ -370,10 +369,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     public CompletableFuture<Boolean> thenDelete(Delete delete) {
       preCheck();
       return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
+        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
           loc, stub, delete,
-          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
+          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+            null, timeRange, d),
           (c, r) -> r.getProcessed()))
         .call();
     }
@@ -381,12 +380,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     @Override
     public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
       preCheck();
-      return RawAsyncTableImpl.this
-        .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
+        rpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
           loc, stub, mutation,
-          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
-            new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
+          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
+            null, timeRange, rm),
           resp -> resp.getExists()))
         .call();
     }
@@ -397,6 +396,68 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
     return new CheckAndMutateBuilderImpl(row, family);
   }
 
+
+  private final class CheckAndMutateWithFilterBuilderImpl
+    implements CheckAndMutateWithFilterBuilder {
+
+    private final byte[] row;
+
+    private final Filter filter;
+
+    private TimeRange timeRange;
+
+    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
+      this.row = Preconditions.checkNotNull(row, "row is null");
+      this.filter = Preconditions.checkNotNull(filter, "filter is null");
+    }
+
+    @Override
+    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
+      this.timeRange = timeRange;
+      return this;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> thenPut(Put put) {
+      validatePut(put, conn.connConf.getMaxKeyValueSize());
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
+          stub, put,
+          (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
+            filter, timeRange, p),
+          (c, r) -> r.getProcessed()))
+        .call();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> thenDelete(Delete delete) {
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
+          loc, stub, delete,
+          (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
+            filter, timeRange, d),
+          (c, r) -> r.getProcessed()))
+        .call();
+    }
+
+    @Override
+    public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
+      return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
+        rpcTimeoutNs)
+        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
+          loc, stub, mutation,
+          (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
+            filter, timeRange, rm),
+          resp -> resp.getExists()))
+        .call();
+    }
+  }
+
+  @Override
+  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
+  }
+
   // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
   // so here I write a new method as I do not want to change the abstraction of call method.
   private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index a6c10ea..cc2d414 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -541,6 +542,53 @@ public interface Table extends Closeable {
      * @return {@code true} if the new delete was executed, {@code false} otherwise.
      */
     boolean thenDelete(Delete delete) throws IOException;
+
+    /**
+     * @param mutation mutations to perform if check succeeds
+     * @return true if the new mutation was executed, false otherwise.
+     */
+    boolean thenMutate(RowMutations mutation) throws IOException;
+  }
+
+  /**
+   * Atomically checks if a row matches the specified filter. If it does, it adds the
+   * Put/Delete/RowMutations.
+   * <p>
+   * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
+   * execute it. This is a fluent style API, the code is like:
+   *
+   * <pre>
+   * <code>
+   * table.checkAndMutate(row, filter).thenPut(put);
+   * </code>
+   * </pre>
+   */
+  default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+    throw new NotImplementedException("Add an implementation!");
+  }
+
+  /**
+   * A helper class for sending checkAndMutate request with a filter.
+   */
+  interface CheckAndMutateWithFilterBuilder {
+
+    /**
+     * @param timeRange timeRange to check
+     */
+    CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
+
+    /**
+     * @param put data to put if check succeeds
+     * @return {@code true} if the new put was executed, {@code false} otherwise.
+     */
+    boolean thenPut(Put put) throws IOException;
+
+    /**
+     * @param delete data to delete if check succeeds
+     * @return {@code true} if the new delete was executed, {@code false} otherwise.
+     */
+    boolean thenDelete(Delete delete) throws IOException;
+
     /**
      * @param mutation mutations to perform if check succeeds
      * @return true if the new mutation was executed, false otherwise.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index afdd653..f293bcf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
 import org.apache.hadoop.hbase.ClusterMetricsBuilder;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -56,7 +57,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -236,71 +238,51 @@ public final class RequestConverter {
   /**
    * Create a protocol buffer MutateRequest for a conditioned put
    *
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param comparator
-   * @param compareType
-   * @param put
    * @return a mutate request
    * @throws IOException
    */
   public static MutateRequest buildMutateRequest(
-      final byte[] regionName, final byte[] row, final byte[] family,
-      final byte [] qualifier, final ByteArrayComparable comparator,
-      final CompareType compareType, TimeRange timeRange, final Put put) throws IOException {
-    return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
-      , put, MutationType.PUT);
+    final byte[] regionName, final byte[] row, final byte[] family,
+    final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+    final TimeRange timeRange, final Put put) throws IOException {
+    return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
+      put, MutationType.PUT);
   }
 
   /**
    * Create a protocol buffer MutateRequest for a conditioned delete
    *
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param comparator
-   * @param compareType
-   * @param delete
    * @return a mutate request
    * @throws IOException
    */
   public static MutateRequest buildMutateRequest(
-      final byte[] regionName, final byte[] row, final byte[] family,
-      final byte [] qualifier, final ByteArrayComparable comparator,
-      final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException {
-    return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
-      , delete, MutationType.DELETE);
+    final byte[] regionName, final byte[] row, final byte[] family,
+    final byte [] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+    final TimeRange timeRange, final Delete delete) throws IOException {
+    return buildMutateRequest(regionName, row, family, qualifier, op, value, filter, timeRange,
+      delete, MutationType.DELETE);
   }
 
   public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
-    final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator,
-    final CompareType compareType, TimeRange timeRange, final Mutation mutation,
+    final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
+    final Filter filter, final TimeRange timeRange, final Mutation mutation,
     final MutationType type) throws IOException {
     return MutateRequest.newBuilder()
       .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
       .setMutation(ProtobufUtil.toMutation(type, mutation))
-      .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
+      .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
       .build();
   }
+
   /**
    * Create a protocol buffer MutateRequest for conditioned row mutations
    *
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param comparator
-   * @param compareType
-   * @param rowMutations
    * @return a mutate request
    * @throws IOException
    */
   public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
     final byte[] row, final byte[] family, final byte[] qualifier,
-    final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange,
+    final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
     final RowMutations rowMutations) throws IOException {
     RegionAction.Builder builder =
         getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
@@ -308,7 +290,7 @@ public final class RequestConverter {
     ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
     MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
     for (Mutation mutation: rowMutations.getMutations()) {
-      MutationType mutateType = null;
+      MutationType mutateType;
       if (mutation instanceof Put) {
         mutateType = MutationType.PUT;
       } else if (mutation instanceof Delete) {
@@ -324,7 +306,7 @@ public final class RequestConverter {
       builder.addAction(actionBuilder.build());
     }
     return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
-        .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
+        .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
         .build();
   }
 
@@ -1080,25 +1062,26 @@ public final class RequestConverter {
   /**
    * Create a protocol buffer Condition
    *
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param comparator
-   * @param compareType
    * @return a Condition
    * @throws IOException
    */
   public static Condition buildCondition(final byte[] row, final byte[] family,
-    final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType,
-    final TimeRange timeRange) {
-    return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row))
-      .setFamily(UnsafeByteOperations.unsafeWrap(family))
-      .setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ?
-        HConstants.EMPTY_BYTE_ARRAY : qualifier))
-      .setComparator(ProtobufUtil.toComparator(comparator))
-      .setCompareType(compareType)
-      .setTimeRange(ProtobufUtil.toTimeRange(timeRange))
-      .build();
+    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+    final TimeRange timeRange) throws IOException {
+
+    Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
+
+    if (filter != null) {
+      builder.setFilter(ProtobufUtil.toFilter(filter));
+    } else {
+      builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
+        .setQualifier(UnsafeByteOperations.unsafeWrap(
+          qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
+        .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
+        .setCompareType(CompareType.valueOf(op.name()));
+    }
+
+    return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
   }
 
   /**
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index a22c623..810aaaa 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -139,11 +139,12 @@ message GetResponse {
  */
 message Condition {
   required bytes row = 1;
-  required bytes family = 2;
-  required bytes qualifier = 3;
-  required CompareType compare_type = 4;
-  required Comparator comparator = 5;
+  optional bytes family = 2;
+  optional bytes qualifier = 3;
+  optional CompareType compare_type = 4;
+  optional Comparator comparator = 5;
   optional TimeRange time_range = 6;
+  optional Filter filter = 7;
 }
 
 
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 3681ae9..b985582 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -139,11 +139,12 @@ message GetResponse {
  */
 message Condition {
   required bytes row = 1;
-  required bytes family = 2;
-  required bytes qualifier = 3;
-  required CompareType compare_type = 4;
-  required Comparator comparator = 5;
+  optional bytes family = 2;
+  optional bytes qualifier = 3;
+  optional CompareType compare_type = 4;
+  optional Comparator comparator = 5;
   optional TimeRange time_range = 6;
+  optional Filter filter = 7;
 }
 
 
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index d4feb5e..c77f463 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -24,6 +24,19 @@ import com.google.protobuf.Message;
 import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -33,12 +46,13 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
@@ -65,19 +79,6 @@ import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
@@ -792,6 +793,11 @@ public class RemoteHTable implements Table {
   }
 
   @Override
+  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+    throw new NotImplementedException("Implement later");
+  }
+
+  @Override
   @Deprecated
   public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
       CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 8761d6b..05071fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -511,9 +512,8 @@ public interface RegionObserver {
    * @param op the comparison operation
    * @param comparator the comparator
    * @param put data to put if check succeeds
-   * @param result
-   * @return the return value to return to client if bypassing default
-   * processing
+   * @param result the default value of the result
+   * @return the return value to return to client if bypassing default processing
    */
   default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
       byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
@@ -522,6 +522,26 @@ public interface RegionObserver {
   }
 
   /**
+   * Called before checkAndPut.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions.
+   * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+   * <p>
+   * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param row row to check
+   * @param filter filter
+   * @param put data to put if check succeeds
+   * @param result the default value of the result
+   * @return the return value to return to client if bypassing default processing
+   */
+  default boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+    Filter filter, Put put, boolean result) throws IOException {
+    return result;
+  }
+
+  /**
    * Called before checkAndPut but after acquiring rowlock.
    * <p>
    * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
@@ -540,9 +560,8 @@ public interface RegionObserver {
    * @param op the comparison operation
    * @param comparator the comparator
    * @param put data to put if check succeeds
-   * @param result
-   * @return the return value to return to client if bypassing default
-   * processing
+   * @param result the default value of the result
+   * @return the return value to return to client if bypassing default processing
    */
   default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
       byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
@@ -551,6 +570,30 @@ public interface RegionObserver {
   }
 
   /**
+   * Called before checkAndPut but after acquiring rowlock.
+   * <p>
+   * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+   * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+   * can lead to potential deadlock.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions.
+   * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+   * <p>
+   * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param row row to check
+   * @param filter filter
+   * @param put data to put if check succeeds
+   * @param result the default value of the result
+   * @return the return value to return to client if bypassing default processing
+   */
+  default boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+    byte[] row, Filter filter, Put put, boolean result) throws IOException {
+    return result;
+  }
+
+  /**
    * Called after checkAndPut
    * <p>
    * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
@@ -572,6 +615,23 @@ public interface RegionObserver {
   }
 
   /**
+   * Called after checkAndPut
+   * <p>
+   * Note: Do not retain references to any Cells in 'put' beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param row row to check
+   * @param filter filter
+   * @param put data to put if check succeeds
+   * @param result from the checkAndPut
+   * @return the possibly transformed return value to return to client
+   */
+  default boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+    Filter filter, Put put, boolean result) throws IOException {
+    return result;
+  }
+
+  /**
    * Called before checkAndDelete.
    * <p>
    * Call CoprocessorEnvironment#bypass to skip default actions.
@@ -586,7 +646,7 @@ public interface RegionObserver {
    * @param op the comparison operation
    * @param comparator the comparator
    * @param delete delete to commit if check succeeds
-   * @param result
+   * @param result the default value of the result
    * @return the value to return to client if bypassing default processing
    */
   default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
@@ -596,6 +656,26 @@ public interface RegionObserver {
   }
 
   /**
+   * Called before checkAndDelete.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions.
+   * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+   * <p>
+   * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param row row to check
+   * @param filter column family
+   * @param delete delete to commit if check succeeds
+   * @param result the default value of the result
+   * @return the value to return to client if bypassing default processing
+   */
+  default boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+    Filter filter, Delete delete, boolean result) throws IOException {
+    return result;
+  }
+
+  /**
    * Called before checkAndDelete but after acquiring rowock.
    * <p>
    * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
@@ -614,7 +694,7 @@ public interface RegionObserver {
    * @param op the comparison operation
    * @param comparator the comparator
    * @param delete delete to commit if check succeeds
-   * @param result
+   * @param result the default value of the result
    * @return the value to return to client if bypassing default processing
    */
   default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
@@ -624,6 +704,30 @@ public interface RegionObserver {
   }
 
   /**
+   * Called before checkAndDelete but after acquiring rowock.
+   * <p>
+   * <b>Note:</b> Caution to be taken for not doing any long time operation in this hook.
+   * Row will be locked for longer time. Trying to acquire lock on another row, within this,
+   * can lead to potential deadlock.
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions.
+   * If 'bypass' is set, we skip out on calling any subsequent chained coprocessors.
+   * <p>
+   * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param row row to check
+   * @param filter filter
+   * @param delete delete to commit if check succeeds
+   * @param result the default value of the result
+   * @return the value to return to client if bypassing default processing
+   */
+  default boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+    byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
+    return result;
+  }
+
+  /**
    * Called after checkAndDelete
    * <p>
    * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
@@ -645,6 +749,23 @@ public interface RegionObserver {
   }
 
   /**
+   * Called after checkAndDelete
+   * <p>
+   * Note: Do not retain references to any Cells in 'delete' beyond the life of this invocation.
+   * If need a Cell reference for later use, copy the cell and use that.
+   * @param c the environment provided by the region server
+   * @param row row to check
+   * @param filter filter
+   * @param delete delete to commit if check succeeds
+   * @param result from the CheckAndDelete
+   * @return the possibly transformed returned value to return to client
+   */
+  default boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+    Filter filter, Delete delete, boolean result) throws IOException {
+    return result;
+  }
+
+  /**
    * Called before Append.
    * <p>
    * Call CoprocessorEnvironment#bypass to skip default actions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index dac034d..37adbca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterWrapper;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.HFileLink;
@@ -4176,13 +4177,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
     ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
     checkMutationType(mutation, row);
-    return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation);
+    return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, null,
+      mutation);
+  }
+
+  @Override
+  public boolean checkAndMutate(byte[] row, Filter filter, TimeRange timeRange, Mutation mutation)
+    throws IOException {
+    return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, null, mutation);
   }
 
   @Override
   public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
     ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
-    return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null);
+    return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, timeRange, rm, null);
+  }
+
+  @Override
+  public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, RowMutations rm)
+    throws IOException {
+    return doCheckAndRowMutate(row, null, null, null, null, filter, timeRange, rm, null);
   }
 
   /**
@@ -4190,7 +4204,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * switches in the few places where there is deviation.
    */
   private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
-    CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange,
+    CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
     RowMutations rowMutations, Mutation mutation)
   throws IOException {
     // Could do the below checks but seems wacky with two callers only. Just comment out for now.
@@ -4204,8 +4218,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     startRegionOperation();
     try {
       Get get = new Get(row);
-      checkFamily(family);
-      get.addColumn(family, qualifier);
+      if (family != null) {
+        checkFamily(family);
+        get.addColumn(family, qualifier);
+      }
+      if (filter != null) {
+        get.setFilter(filter);
+      }
       if (timeRange != null) {
         get.setTimeRange(timeRange.getMin(), timeRange.getMax());
       }
@@ -4217,11 +4236,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // Call coprocessor.
           Boolean processed = null;
           if (mutation instanceof Put) {
-            processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
-                qualifier, op, comparator, (Put)mutation);
+            if (filter != null) {
+              processed = this.getCoprocessorHost()
+                .preCheckAndPutAfterRowLock(row, filter, (Put) mutation);
+            } else {
+              processed = this.getCoprocessorHost()
+                .preCheckAndPutAfterRowLock(row, family, qualifier, op, comparator,
+                  (Put) mutation);
+            }
           } else if (mutation instanceof Delete) {
-            processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
-                qualifier, op, comparator, (Delete)mutation);
+            if (filter != null) {
+              processed = this.getCoprocessorHost()
+                .preCheckAndDeleteAfterRowLock(row, filter, (Delete) mutation);
+            } else {
+              processed = this.getCoprocessorHost()
+                .preCheckAndDeleteAfterRowLock(row, family, qualifier, op, comparator,
+                  (Delete) mutation);
+            }
           }
           if (processed != null) {
             return processed;
@@ -4231,20 +4262,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // Supposition is that now all changes are done under row locks, then when we go to read,
         // we'll get the latest on this row.
         List<Cell> result = get(get, false);
-        boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
         boolean matches = false;
         long cellTs = 0;
-        if (result.isEmpty() && valueIsNull) {
-          matches = true;
-        } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
-          matches = true;
-          cellTs = result.get(0).getTimestamp();
-        } else if (result.size() == 1 && !valueIsNull) {
-          Cell kv = result.get(0);
-          cellTs = kv.getTimestamp();
-          int compareResult = PrivateCellUtil.compareValue(kv, comparator);
-          matches = matches(op, compareResult);
+        if (filter != null) {
+          if (!result.isEmpty()) {
+            matches = true;
+            cellTs = result.get(0).getTimestamp();
+          }
+        } else {
+          boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
+          if (result.isEmpty() && valueIsNull) {
+            matches = true;
+          } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
+            matches = true;
+            cellTs = result.get(0).getTimestamp();
+          } else if (result.size() == 1 && !valueIsNull) {
+            Cell kv = result.get(0);
+            cellTs = kv.getTimestamp();
+            int compareResult = PrivateCellUtil.compareValue(kv, comparator);
+            matches = matches(op, compareResult);
+          }
         }
+
         // If matches put the new put or delete the new delete
         if (matches) {
           // We have acquired the row lock already. If the system clock is NOT monotonically
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index ed4ead0..ab1aea3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -613,9 +614,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @param cellScanner if non-null, the mutation data -- the Cell content.
    */
   private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
-    final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
-    ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
-    ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
+    final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
+    CompareOperator op, ByteArrayComparable comparator, Filter filter, TimeRange timeRange,
+    RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement)
+    throws IOException {
     int countOfCompleteMutation = 0;
     try {
       if (!region.getRegionInfo().isMetaRegion()) {
@@ -658,7 +660,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         builder.addResultOrException(
           resultOrExceptionOrBuilder.build());
       }
-      return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
+
+      if (filter != null) {
+        return region.checkAndRowMutate(row, filter, timeRange, rm);
+      } else {
+        return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
+      }
     } finally {
       // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
       // even if the malformed cells are not skipped.
@@ -2728,18 +2735,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (request.hasCondition()) {
             Condition condition = request.getCondition();
             byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.getFamily().toByteArray();
-            byte[] qualifier = condition.getQualifier().toByteArray();
-            CompareOperator op =
-              CompareOperator.valueOf(condition.getCompareType().name());
-            ByteArrayComparable comparator =
-                ProtobufUtil.toComparator(condition.getComparator());
+            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+            byte[] qualifier = condition.hasQualifier() ?
+              condition.getQualifier().toByteArray() : null;
+            CompareOperator op = condition.hasCompareType() ?
+              CompareOperator.valueOf(condition.getCompareType().name()) : null;
+            ByteArrayComparable comparator = condition.hasComparator() ?
+              ProtobufUtil.toComparator(condition.getComparator()) : null;
+            Filter filter = condition.hasFilter() ?
+              ProtobufUtil.toFilter(condition.getFilter()) : null;
             TimeRange timeRange = condition.hasTimeRange() ?
               ProtobufUtil.toTimeRange(condition.getTimeRange()) :
               TimeRange.allTime();
             processed =
               checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
-                qualifier, op, comparator, timeRange, regionActionResultBuilder,
+                qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
                 spaceQuotaEnforcement);
           } else {
             doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
@@ -2878,24 +2888,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (request.hasCondition()) {
             Condition condition = request.getCondition();
             byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.getFamily().toByteArray();
-            byte[] qualifier = condition.getQualifier().toByteArray();
-            CompareOperator compareOp =
-              CompareOperator.valueOf(condition.getCompareType().name());
-            ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
+            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+            byte[] qualifier = condition.hasQualifier() ?
+              condition.getQualifier().toByteArray() : null;
+            CompareOperator op = condition.hasCompareType() ?
+              CompareOperator.valueOf(condition.getCompareType().name()) : null;
+            ByteArrayComparable comparator = condition.hasComparator() ?
+              ProtobufUtil.toComparator(condition.getComparator()) : null;
+            Filter filter = condition.hasFilter() ?
+              ProtobufUtil.toFilter(condition.getFilter()) : null;
             TimeRange timeRange = condition.hasTimeRange() ?
               ProtobufUtil.toTimeRange(condition.getTimeRange()) :
               TimeRange.allTime();
             if (region.getCoprocessorHost() != null) {
-              processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
-                  compareOp, comparator, put);
+              if (filter != null) {
+                processed = region.getCoprocessorHost().preCheckAndPut(row, filter, put);
+              } else {
+                processed = region.getCoprocessorHost()
+                  .preCheckAndPut(row, family, qualifier, op, comparator, put);
+              }
             }
             if (processed == null) {
-              boolean result = region.checkAndMutate(row, family,
-                qualifier, compareOp, comparator, timeRange, put);
+              boolean result;
+              if (filter != null) {
+                result = region.checkAndMutate(row, filter, timeRange, put);
+              } else {
+                result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
+                  put);
+              }
               if (region.getCoprocessorHost() != null) {
-                result = region.getCoprocessorHost().postCheckAndPut(row, family,
-                  qualifier, compareOp, comparator, put, result);
+                if (filter != null) {
+                  result = region.getCoprocessorHost().postCheckAndPut(row, filter, put, result);
+                } else {
+                  result = region.getCoprocessorHost()
+                    .postCheckAndPut(row, family, qualifier, op, comparator, put, result);
+                }
               }
               processed = result;
             }
@@ -2912,23 +2939,42 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (request.hasCondition()) {
             Condition condition = request.getCondition();
             byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.getFamily().toByteArray();
-            byte[] qualifier = condition.getQualifier().toByteArray();
-            CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
-            ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
+            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+            byte[] qualifier = condition.hasQualifier() ?
+              condition.getQualifier().toByteArray() : null;
+            CompareOperator op = condition.hasCompareType() ?
+              CompareOperator.valueOf(condition.getCompareType().name()) : null;
+            ByteArrayComparable comparator = condition.hasComparator() ?
+              ProtobufUtil.toComparator(condition.getComparator()) : null;
+            Filter filter = condition.hasFilter() ?
+              ProtobufUtil.toFilter(condition.getFilter()) : null;
             TimeRange timeRange = condition.hasTimeRange() ?
               ProtobufUtil.toTimeRange(condition.getTimeRange()) :
               TimeRange.allTime();
             if (region.getCoprocessorHost() != null) {
-              processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
-                  comparator, delete);
+              if (filter != null) {
+                processed = region.getCoprocessorHost().preCheckAndDelete(row, filter, delete);
+              } else {
+                processed = region.getCoprocessorHost()
+                  .preCheckAndDelete(row, family, qualifier, op, comparator, delete);
+              }
             }
             if (processed == null) {
-              boolean result = region.checkAndMutate(row, family,
-                qualifier, op, comparator, timeRange, delete);
+              boolean result;
+              if (filter != null) {
+                result = region.checkAndMutate(row, filter, timeRange, delete);
+              } else {
+                result = region.checkAndMutate(row, family, qualifier, op, comparator, timeRange,
+                  delete);
+              }
               if (region.getCoprocessorHost() != null) {
-                result = region.getCoprocessorHost().postCheckAndDelete(row, family,
-                  qualifier, op, comparator, delete, result);
+                if (filter != null) {
+                  result = region.getCoprocessorHost().postCheckAndDelete(row, filter, delete,
+                    result);
+                } else {
+                  result = region.getCoprocessorHost()
+                    .postCheckAndDelete(row, family, qualifier, op, comparator, delete, result);
+                }
               }
               processed = result;
             }
@@ -2979,7 +3025,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           break;
         default:
           break;
-
         }
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index ecc2158..1790468 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -331,6 +332,31 @@ public interface Region extends ConfigurationObserver {
       ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
 
   /**
+   * Atomically checks if a row matches the filter and if it does, it performs the mutation. See
+   * checkAndRowMutate to do many checkAndPuts at a time on a single row.
+   * @param row to check
+   * @param filter the filter
+   * @param mutation data to put if check succeeds
+   * @return true if mutation was applied, false otherwise
+   */
+  default boolean checkAndMutate(byte [] row, Filter filter, Mutation mutation)
+    throws IOException {
+    return checkAndMutate(row, filter, TimeRange.allTime(), mutation);
+  }
+
+  /**
+   * Atomically checks if a row value matches the filter and if it does, it performs the mutation.
+   * See checkAndRowMutate to do many checkAndPuts at a time on a single row.
+   * @param row to check
+   * @param filter the filter
+   * @param mutation data to put if check succeeds
+   * @param timeRange time range to check
+   * @return true if mutation was applied, false otherwise
+   */
+  boolean checkAndMutate(byte [] row, Filter filter, TimeRange timeRange, Mutation mutation)
+    throws IOException;
+
+  /**
    * Atomically checks if a row/family/qualifier value matches the expected values and if it does,
    * it performs the row mutations. If the passed value is null, the lack of column value
    * (ie: non-existence) is used. Use to do many mutations on a single row. Use checkAndMutate
@@ -368,6 +394,33 @@ public interface Region extends ConfigurationObserver {
       throws IOException;
 
   /**
+   * Atomically checks if a row matches the filter and if it does, it performs the row mutations.
+   * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
+   * time.
+   * @param row to check
+   * @param filter the filter
+   * @param mutations data to put if check succeeds
+   * @return true if mutations were applied, false otherwise
+   */
+  default boolean checkAndRowMutate(byte[] row, Filter filter, RowMutations mutations)
+    throws IOException {
+    return checkAndRowMutate(row, filter, TimeRange.allTime(), mutations);
+  }
+
+  /**
+   * Atomically checks if a row matches the filter and if it does, it performs the row mutations.
+   * Use to do many mutations on a single row. Use checkAndMutate to do one checkAndMutate at a
+   * time.
+   * @param row to check
+   * @param filter the filter
+   * @param mutations data to put if check succeeds
+   * @param timeRange time range to check
+   * @return true if mutations were applied, false otherwise
+   */
+  boolean checkAndRowMutate(byte [] row, Filter filter, TimeRange timeRange,
+    RowMutations mutations) throws IOException;
+
+  /**
    * Deletes the specified cells/row.
    * @param delete
    * @throws IOException
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index c75e562..331e7cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -1082,13 +1083,38 @@ public class RegionCoprocessorHost
   /**
    * Supports Coprocessor 'bypass'.
    * @param row row to check
+   * @param filter filter
+   * @param put data to put if check succeeds
+   * @return true or false to return to client if default processing should be bypassed, or null
+   * otherwise
+   */
+  public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put)
+    throws IOException {
+    boolean bypassable = true;
+    boolean defaultResult = false;
+    if (coprocEnvironments.isEmpty()) {
+      return null;
+    }
+    return execOperationWithResult(
+      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+        defaultResult, bypassable) {
+        @Override
+        public Boolean call(RegionObserver observer) throws IOException {
+          return observer.preCheckAndPut(this, row, filter, put, getResult());
+        }
+      });
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
    * @param op the comparison operation
    * @param comparator the comparator
    * @param put data to put if check succeeds
    * @return true or false to return to client if default processing should be bypassed, or null
-   * otherwise
+   *   otherwise
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
       justification="Null is legit")
@@ -1112,6 +1138,33 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * Supports Coprocessor 'bypass'.
+   * @param row row to check
+   * @param filter filter
+   * @param put data to put if check succeeds
+   * @return true or false to return to client if default processing should be bypassed, or null
+   *   otherwise
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
+    justification="Null is legit")
+  public Boolean preCheckAndPutAfterRowLock(
+    final byte[] row, final Filter filter, final Put put) throws IOException {
+    boolean bypassable = true;
+    boolean defaultResult = false;
+    if (coprocEnvironments.isEmpty()) {
+      return null;
+    }
+    return execOperationWithResult(
+      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+        defaultResult, bypassable) {
+        @Override
+        public Boolean call(RegionObserver observer) throws IOException {
+          return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult());
+        }
+      });
+  }
+
+  /**
    * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
@@ -1138,6 +1191,26 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * @param row row to check
+   * @param filter filter
+   * @param put data to put if check succeeds
+   * @throws IOException e
+   */
+  public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put,
+    boolean result) throws IOException {
+    if (this.coprocEnvironments.isEmpty()) {
+      return result;
+    }
+    return execOperationWithResult(
+      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
+        @Override
+        public Boolean call(RegionObserver observer) throws IOException {
+          return observer.postCheckAndPut(this, row, filter, put, getResult());
+        }
+      });
+  }
+
+  /**
    * Supports Coprocessor 'bypass'.
    * @param row row to check
    * @param family column family
@@ -1145,8 +1218,8 @@ public class RegionCoprocessorHost
    * @param op the comparison operation
    * @param comparator the comparator
    * @param delete delete to commit if check succeeds
-   * @return true or false to return to client if default processing should be bypassed,
-   * or null otherwise
+   * @return true or false to return to client if default processing should be bypassed, or null
+   *   otherwise
    */
   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
       final byte [] qualifier, final CompareOperator op,
@@ -1171,6 +1244,31 @@ public class RegionCoprocessorHost
   /**
    * Supports Coprocessor 'bypass'.
    * @param row row to check
+   * @param filter filter
+   * @param delete delete to commit if check succeeds
+   * @return true or false to return to client if default processing should be bypassed, or null
+   *   otherwise
+   */
+  public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete)
+    throws IOException {
+    boolean bypassable = true;
+    boolean defaultResult = false;
+    if (coprocEnvironments.isEmpty()) {
+      return null;
+    }
+    return execOperationWithResult(
+      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+        defaultResult, bypassable) {
+        @Override
+        public Boolean call(RegionObserver observer) throws IOException {
+          return observer.preCheckAndDelete(this, row, filter, delete, getResult());
+        }
+      });
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
    * @param op the comparison operation
@@ -1201,6 +1299,33 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * Supports Coprocessor 'bypass'.
+   * @param row row to check
+   * @param filter filter
+   * @param delete delete to commit if check succeeds
+   * @return true or false to return to client if default processing should be bypassed,
+   * or null otherwise
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
+    justification="Null is legit")
+  public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter,
+    final Delete delete) throws IOException {
+    boolean bypassable = true;
+    boolean defaultResult = false;
+    if (coprocEnvironments.isEmpty()) {
+      return null;
+    }
+    return execOperationWithResult(
+      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
+        defaultResult, bypassable) {
+        @Override
+        public Boolean call(RegionObserver observer) throws IOException {
+          return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult());
+        }
+      });
+  }
+
+  /**
    * @param row row to check
    * @param family column family
    * @param qualifier column qualifier
@@ -1227,6 +1352,26 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * @param row row to check
+   * @param filter filter
+   * @param delete delete to commit if check succeeds
+   * @throws IOException e
+   */
+  public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete,
+    boolean result) throws IOException {
+    if (this.coprocEnvironments.isEmpty()) {
+      return result;
+    }
+    return execOperationWithResult(
+      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
+        @Override
+        public Boolean call(RegionObserver observer) throws IOException {
+          return observer.postCheckAndDelete(this, row, filter, delete, getResult());
+        }
+      });
+  }
+
+  /**
    * Supports Coprocessor 'bypass'.
    * @param append append object
    * @return result to return to client if default operation should be bypassed, null otherwise
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 63080b9..b9fb811 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -41,10 +42,17 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -402,6 +410,201 @@ public class TestAsyncTable {
   }
 
   @Test
+  public void testCheckAndMutateWithSingleFilter() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put one row
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+    table.put(put).get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+      .get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("b")))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+      .get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
+
+    // Delete with success
+    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
+      .get();
+    assertTrue(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
+
+    // Mutate with success
+    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+        CompareOperator.EQUAL, Bytes.toBytes("b")))
+      .thenMutate(new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
+      .get();
+    assertTrue(ok);
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
+  }
+
+  @Test
+  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put one row
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+    table.put(put).get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(row, new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))
+      ))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+      .get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(row, new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("c"))
+      ))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+      .get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
+
+    // Delete with success
+    ok = table.checkAndMutate(row, new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))
+      ))
+      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
+      .get();
+    assertTrue(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
+
+    // Mutate with success
+    ok = table.checkAndMutate(row, new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))
+      ))
+      .thenMutate(new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
+      .get();
+    assertTrue(ok);
+
+    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
+  }
+
+  @Test
+  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put with specifying the timestamp
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(row, new FilterList(
+        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+        new TimestampsFilter(Collections.singletonList(100L))
+      ))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
+      .get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(row, new FilterList(
+        new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+        new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+        new TimestampsFilter(Collections.singletonList(101L))
+      ))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+      .get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
+  }
+
+  @Test
+  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+
+    // Put with specifying the timestamp
+    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
+      .get();
+
+    // Put with success
+    boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .timeRange(TimeRange.between(0, 101))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
+      .get();
+    assertTrue(ok);
+
+    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // Put with failure
+    ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("a")))
+      .timeRange(TimeRange.between(0, 100))
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
+      .get();
+    assertFalse(ok);
+
+    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+    getTable.get().checkAndMutate(row, FAMILY)
+      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+  }
+
+  @Test
   public void testDisabled() throws InterruptedException, ExecutionException {
     ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
     try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
index 15ef065..934c09c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
@@ -17,13 +17,24 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -184,4 +195,183 @@ public class TestCheckAndMutate {
     }
   }
 
+  @Test
+  public void testCheckAndMutateWithSingleFilter() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // Put with success
+      boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
+          Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+          CompareOperator.EQUAL, Bytes.toBytes("b")))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+      // Delete with success
+      ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+          CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
+      assertTrue(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+      // Mutate with success
+      ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+          CompareOperator.EQUAL, Bytes.toBytes("b")))
+        .thenMutate(new RowMutations(ROWKEY)
+          .add((Mutation) new Put(ROWKEY)
+            .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+          .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
+      assertTrue(ok);
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // Put with success
+      boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+            Bytes.toBytes("b"))
+        ))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(ROWKEY, new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+            Bytes.toBytes("c"))
+        ))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+      // Delete with success
+      ok = table.checkAndMutate(ROWKEY, new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+            Bytes.toBytes("b"))
+        ))
+        .thenDelete(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D")));
+      assertTrue(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+      // Mutate with success
+      ok = table.checkAndMutate(ROWKEY, new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+            Bytes.toBytes("b"))
+        ))
+        .thenMutate(new RowMutations(ROWKEY)
+          .add((Mutation) new Put(ROWKEY)
+            .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+          .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A"))));
+      assertTrue(ok);
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+    try (Table table = createTable()) {
+      // Put with specifying the timestamp
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+      // Put with success
+      boolean ok = table.checkAndMutate(ROWKEY, new FilterList(
+          new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+          new TimestampsFilter(Collections.singletonList(100L))
+        ))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(ROWKEY, new FilterList(
+          new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+          new TimestampsFilter(Collections.singletonList(101L))
+        ))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+    try (Table table = createTable()) {
+      // Put with specifying the timestamp
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+      // Put with success
+      boolean ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY,
+          Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .timeRange(TimeRange.between(0, 101))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(ROWKEY, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+          CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .timeRange(TimeRange.between(0, 100))
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+    }
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+    try (Table table = createTable()) {
+      table.checkAndMutate(ROWKEY, FAMILY)
+        .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index ef4ca25..43d72e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -30,12 +30,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -238,8 +238,7 @@ public class TestMalformedCellFromClient {
     ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
     ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
     ClientProtos.Condition condition = RequestConverter
-      .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
-        HBaseProtos.CompareType.EQUAL, null);
+      .buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null);
     for (Mutation mutation : rm.getMutations()) {
       ClientProtos.MutationProto.MutationType mutateType = null;
       if (mutation instanceof Put) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index caf0abb..523466d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -99,11 +100,17 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   final AtomicInteger ctPostIncrement = new AtomicInteger(0);
   final AtomicInteger ctPostAppend = new AtomicInteger(0);
   final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0);
+  final AtomicInteger ctPreCheckAndPutWithFilter = new AtomicInteger(0);
   final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0);
+  final AtomicInteger ctPreCheckAndPutWithFilterAfterRowLock = new AtomicInteger(0);
   final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0);
+  final AtomicInteger ctPostCheckAndPutWithFilter = new AtomicInteger(0);
   final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0);
+  final AtomicInteger ctPreCheckAndDeleteWithFilter = new AtomicInteger(0);
   final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0);
+  final AtomicInteger ctPreCheckAndDeleteWithFilterAfterRowLock = new AtomicInteger(0);
   final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
+  final AtomicInteger ctPostCheckAndDeleteWithFilter = new AtomicInteger(0);
   final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
   final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
   final AtomicInteger ctPostScannerFilterRow = new AtomicInteger(0);
@@ -493,6 +500,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+    Filter filter, Put put, boolean result) throws IOException {
+    ctPreCheckAndPutWithFilter.incrementAndGet();
+    return true;
+  }
+
+  @Override
   public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
       byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
       ByteArrayComparable comparator, Put put, boolean result) throws IOException {
@@ -501,6 +515,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+    byte[] row, Filter filter, Put put, boolean result) throws IOException {
+    ctPreCheckAndPutWithFilterAfterRowLock.incrementAndGet();
+    return true;
+  }
+
+  @Override
   public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
                                  byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
                                  Put put, boolean result) throws IOException {
@@ -509,6 +530,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+    Filter filter, Put put, boolean result) throws IOException {
+    ctPostCheckAndPutWithFilter.incrementAndGet();
+    return true;
+  }
+
+  @Override
   public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
                                    byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
                                    Delete delete, boolean result) throws IOException {
@@ -517,6 +545,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+    Filter filter, Delete delete, boolean result) throws IOException {
+    ctPreCheckAndDeleteWithFilter.incrementAndGet();
+    return true;
+  }
+
+  @Override
   public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
       byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
       ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
@@ -525,6 +560,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
+    byte[] row, Filter filter, Delete delete, boolean result) throws IOException {
+    ctPreCheckAndDeleteWithFilterAfterRowLock.incrementAndGet();
+    return true;
+  }
+
+  @Override
   public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
                                     byte[] family, byte[] qualifier, CompareOperator compareOp, ByteArrayComparable comparator,
                                     Delete delete, boolean result) throws IOException {
@@ -533,6 +575,13 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> e, byte[] row,
+    Filter filter, Delete delete, boolean result) throws IOException {
+    ctPostCheckAndDeleteWithFilter.incrementAndGet();
+    return true;
+  }
+
+  @Override
   public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e,
       Append append) throws IOException {
     ctPreAppendAfterRowLock.incrementAndGet();
@@ -693,28 +742,52 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
     return ctPostCloseRegionOperation.get();
   }
 
-  public boolean hadPreCheckAndPut() {
-    return ctPreCheckAndPut.get() > 0;
+  public int getPreCheckAndPut() {
+    return ctPreCheckAndPut.get();
+  }
+
+  public int getPreCheckAndPutWithFilter() {
+    return ctPreCheckAndPutWithFilter.get();
+  }
+
+  public int getPreCheckAndPutAfterRowLock() {
+    return ctPreCheckAndPutAfterRowLock.get();
+  }
+
+  public int getPreCheckAndPutWithFilterAfterRowLock() {
+    return ctPreCheckAndPutWithFilterAfterRowLock.get();
+  }
+
+  public int getPostCheckAndPut() {
+    return ctPostCheckAndPut.get();
+  }
+
+  public int getPostCheckAndPutWithFilter() {
+    return ctPostCheckAndPutWithFilter.get();
+  }
+
+  public int getPreCheckAndDelete() {
+    return ctPreCheckAndDelete.get();
   }
 
-  public boolean hadPreCheckAndPutAfterRowLock() {
-    return ctPreCheckAndPutAfterRowLock.get() > 0;
+  public int getPreCheckAndDeleteWithFilter() {
+    return ctPreCheckAndDeleteWithFilter.get();
   }
 
-  public boolean hadPostCheckAndPut() {
-    return ctPostCheckAndPut.get() > 0;
+  public int getPreCheckAndDeleteAfterRowLock() {
+    return ctPreCheckAndDeleteAfterRowLock.get();
   }
 
-  public boolean hadPreCheckAndDelete() {
-    return ctPreCheckAndDelete.get() > 0;
+  public int getPreCheckAndDeleteWithFilterAfterRowLock() {
+    return ctPreCheckAndDeleteWithFilterAfterRowLock.get();
   }
 
-  public boolean hadPreCheckAndDeleteAfterRowLock() {
-    return ctPreCheckAndDeleteAfterRowLock.get() > 0;
+  public int getPostCheckAndDelete() {
+    return ctPostCheckAndDelete.get();
   }
 
-  public boolean hadPostCheckAndDelete() {
-    return ctPostCheckAndDelete.get() > 0;
+  public int getPostCheckAndDeleteWithFilter() {
+    return ctPostCheckAndDeleteWithFilter.get();
   }
 
   public boolean hadPreIncrement() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 8a12cd6..5b77027 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.filter.FilterAllFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -263,12 +265,26 @@ public class TestRegionObserverInterface {
       p = new Put(Bytes.toBytes(0));
       p.addColumn(A, A, A);
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
-        tableName, new Boolean[] { false, false, false });
+        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
+          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
+          "getPostCheckAndPutWithFilter" },
+        tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+
       table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenPut(p);
       verifyMethodResult(SimpleRegionObserver.class,
-        new String[] { "hadPreCheckAndPut", "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut" },
-        tableName, new Boolean[] { true, true, true });
+        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
+          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
+          "getPostCheckAndPutWithFilter" },
+        tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+
+      table.checkAndMutate(Bytes.toBytes(0),
+          new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
+        .thenPut(p);
+      verifyMethodResult(SimpleRegionObserver.class,
+        new String[] { "getPreCheckAndPut", "getPreCheckAndPutAfterRowLock", "getPostCheckAndPut",
+          "getPreCheckAndPutWithFilter", "getPreCheckAndPutWithFilterAfterRowLock",
+          "getPostCheckAndPutWithFilter" },
+        tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
     } finally {
       util.deleteTable(tableName);
     }
@@ -285,14 +301,29 @@ public class TestRegionObserverInterface {
       Delete d = new Delete(Bytes.toBytes(0));
       table.delete(d);
       verifyMethodResult(
-        SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
-            "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
-        tableName, new Boolean[] { false, false, false });
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
+          "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
+          "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
+          "getPostCheckAndDeleteWithFilter" },
+        tableName, new Integer[] { 0, 0, 0, 0, 0, 0 });
+
       table.checkAndMutate(Bytes.toBytes(0), A).qualifier(A).ifEquals(A).thenDelete(d);
       verifyMethodResult(
-        SimpleRegionObserver.class, new String[] { "hadPreCheckAndDelete",
-            "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete" },
-        tableName, new Boolean[] { true, true, true });
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
+          "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
+          "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
+          "getPostCheckAndDeleteWithFilter" },
+        tableName, new Integer[] { 1, 1, 1, 0, 0, 0 });
+
+      table.checkAndMutate(Bytes.toBytes(0),
+          new SingleColumnValueFilter(A, A, CompareOperator.EQUAL, A))
+        .thenDelete(d);
+      verifyMethodResult(
+        SimpleRegionObserver.class, new String[] { "getPreCheckAndDelete",
+          "getPreCheckAndDeleteAfterRowLock", "getPostCheckAndDelete",
+          "getPreCheckAndDeleteWithFilter", "getPreCheckAndDeleteWithFilterAfterRowLock",
+          "getPostCheckAndDeleteWithFilter" },
+        tableName, new Integer[] { 1, 1, 1, 1, 1, 1 });
     } finally {
       util.deleteTable(tableName);
       table.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
index 37a1236..5a1315d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 
 /**
@@ -274,6 +275,11 @@ public class RegionAsTable implements Table {
   }
 
   @Override
+  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     throw new UnsupportedOperationException();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 8ed1c6b..f58b82a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.SubstringComparator;
 import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -2149,6 +2150,128 @@ public class TestHRegion {
     assertEquals(0, r.size());
   }
 
+  @Test
+  public void testCheckAndMutate_WithFilters() throws Throwable {
+    final byte[] FAMILY = Bytes.toBytes("fam");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+    // Put one row
+    Put put = new Put(row);
+    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
+    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
+    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
+    region.put(put);
+
+    // Put with success
+    boolean ok = region.checkAndMutate(row,
+      new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))
+      ),
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+    assertTrue(ok);
+
+    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    // Put with failure
+    ok = region.checkAndMutate(row,
+      new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("c"))
+      ),
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
+    assertFalse(ok);
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
+
+    // Delete with success
+    ok = region.checkAndMutate(row,
+      new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))
+      ),
+      new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")));
+    assertTrue(ok);
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
+
+    // Mutate with success
+    ok = region.checkAndRowMutate(row,
+      new FilterList(
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+          Bytes.toBytes("a")),
+        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+          Bytes.toBytes("b"))
+      ),
+      new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
+    assertTrue(ok);
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
+    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+  }
+
+  @Test
+  public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
+    final byte[] FAMILY = Bytes.toBytes("fam");
+
+    // Setting up region
+    this.region = initHRegion(tableName, method, CONF, FAMILY);
+
+    // Put with specifying the timestamp
+    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+    // Put with success
+    boolean ok = region.checkAndMutate(row,
+      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+        Bytes.toBytes("a")),
+      TimeRange.between(0, 101),
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+    assertTrue(ok);
+
+    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
+    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+    // Put with failure
+    ok = region.checkAndMutate(row,
+      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+        Bytes.toBytes("a")),
+      TimeRange.between(0, 100),
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+    assertFalse(ok);
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
+
+    // Mutate with success
+    ok = region.checkAndRowMutate(row,
+      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+        Bytes.toBytes("a")),
+      TimeRange.between(0, 101),
+      new RowMutations(row)
+        .add((Mutation) new Put(row)
+          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
+    assertTrue(ok);
+
+    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
+    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
+  }
+
   // ////////////////////////////////////////////////////////////////////////////
   // Delete tests
   // ////////////////////////////////////////////////////////////////////////////
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
index 6db9474..942e232 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftTable.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.thrift2.ThriftUtilities;
@@ -429,6 +430,11 @@ public class ThriftTable implements Table {
   }
 
   @Override
+  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
+    throw new NotImplementedException("Implement later");
+  }
+
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     TRowMutations tRowMutations = ThriftUtilities.rowMutationsFromHBase(rm);
     try {