You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/05/23 01:01:12 UTC

[GitHub] [hbase] brfrn169 commented on a change in pull request #1648: HBASE-8458 Support for batch version of checkAndMutate()

brfrn169 commented on a change in pull request #1648:
URL: https://github.com/apache/hbase/pull/1648#discussion_r429218657



##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
+ * and {@link RowMutations} are supported.
+ * <p>
+ * This has a fluent style API to instantiate it, the code is like:
+ * <pre>
+ * <code>
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row equals to the specified value
+ * CheckAndMutate checkAndMutate = new CheckAndMutate(row)
+ *   .ifEquals(family, qualifier, value)
+ *   .action(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row doesn't exist
+ * CheckAndMutate checkAndMutate = new CheckAndMutate(row)
+ *   .ifNotExists(family, qualifier)
+ *   .action(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the row matches the filter
+ * CheckAndMutate checkAndMutate = new CheckAndMutate(row)
+ *   .ifMatches(filter)
+ *   .action(delete);
+ * </code>
+ * </pre>
+ */
+@InterfaceAudience.Public
+public final class CheckAndMutate extends Mutation {

Review comment:
       Yes, I think I should add `InterfaceStability.Evolving`. Thanks.

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
##########
@@ -344,6 +360,35 @@ default CheckAndMutateBuilder ifEquals(byte[] value) {
     CompletableFuture<Boolean> thenMutate(RowMutations mutation);
   }
 
+  /**
+   * checkAndMutate that atomically checks if a row matches the specified condition. If it does,
+   * it performs the specified action.
+   *
+   * @param checkAndMutate The CheckAndMutate object.
+   * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
+   */
+  CompletableFuture<Boolean> checkAndMutate(CheckAndMutate checkAndMutate);
+
+  /**
+   * Batch version of checkAndMutate.

Review comment:
       Yes, that's correct. I will add that to the JavaDoc of the new checkAndMutate methods for batch version. Thanks.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -2827,33 +2904,104 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
           quota.close();
           continue;
         }
-        // How does this call happen?  It may need some work to play well w/ the surroundings.
-        // Need to return an item per Action along w/ Action index.  TODO.
+
         try {
-          if (request.hasCondition()) {
-            Condition condition = request.getCondition();
-            byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
-            byte[] qualifier = condition.hasQualifier() ?
-              condition.getQualifier().toByteArray() : null;
-            CompareOperator op = condition.hasCompareType() ?
-              CompareOperator.valueOf(condition.getCompareType().name()) : null;
-            ByteArrayComparable comparator = condition.hasComparator() ?
-              ProtobufUtil.toComparator(condition.getComparator()) : null;
-            Filter filter = condition.hasFilter() ?
-              ProtobufUtil.toFilter(condition.getFilter()) : null;
-            TimeRange timeRange = condition.hasTimeRange() ?
-              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
-              TimeRange.allTime();
+          Condition condition = regionAction.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+          byte[] qualifier = condition.hasQualifier() ?
+            condition.getQualifier().toByteArray() : null;
+          CompareOperator op = condition.hasCompareType() ?
+            CompareOperator.valueOf(condition.getCompareType().name()) : null;
+          ByteArrayComparable comparator = condition.hasComparator() ?
+            ProtobufUtil.toComparator(condition.getComparator()) : null;
+          Filter filter = condition.hasFilter() ?
+            ProtobufUtil.toFilter(condition.getFilter()) : null;
+          TimeRange timeRange = condition.hasTimeRange() ?
+            ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+            TimeRange.allTime();
+
+          boolean processed;
+          if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+            // RowMutations
             processed =
               checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
                 qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
                 spaceQuotaEnforcement);
           } else {
-            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
-              cellScanner, spaceQuotaEnforcement);
-            processed = Boolean.TRUE;
+            if (regionAction.getActionList().isEmpty()) {

Review comment:
       Yes we should. If this action list is empty, NPE will happen. Actually, the regionAction should always have one action when we use the Java API correctly. However, if we use another client, the action list can be empty (I think it's a bug of the client though). So I put this check here.

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
##########
@@ -411,6 +427,29 @@ default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter
     boolean thenMutate(RowMutations mutation) throws IOException;
   }
 
+  /**
+   * checkAndMutate that atomically checks if a row matches the specified condition. If it does,
+   * it performs the specified action.
+   *
+   * @param checkAndMutate The CheckAndMutate object.
+   * @return boolean that represents the result for the CheckAndMutate.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  default boolean checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+    return checkAndMutate(Collections.singletonList(checkAndMutate))[0];
+  }
+
+  /**
+   * Batch version of checkAndMutate.
+   *
+   * @param checkAndMutates The list of CheckAndMutate.
+   * @return A array of boolean that represents the result for each CheckAndMutate.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  default boolean[] checkAndMutate(List<CheckAndMutate> checkAndMutates) throws IOException {
+    throw new NotImplementedException("Add an implementation!");

Review comment:
       Other methods (Get, Put, etc.) for batch version do the same thing. I just aligned this method with them.

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
##########
@@ -599,11 +612,748 @@ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
   }
 
   @Test(expected = NullPointerException.class)
-  public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithoutConditionForOldApi() {
     getTable.get().checkAndMutate(row, FAMILY)
       .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
   }
 
+  // Tests for new checkAndMutate API
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  @Test
+  public void testCheckAndPut() throws InterruptedException, ExecutionException {
+    AsyncTable<?> table = getTable.get();
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    int count = 10;
+    CountDownLatch latch = new CountDownLatch(count);
+
+    IntStream.range(0, count)
+      .forEach(i -> table.checkAndMutate(new CheckAndMutate(row)
+          .ifNotExists(FAMILY, QUALIFIER)
+          .action(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          latch.countDown();
+        }));
+    latch.await();
+    assertEquals(1, successCount.get());
+    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
+    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  @Test
+  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
+    AsyncTable<?> table = getTable.get();
+    int count = 10;
+    CountDownLatch putLatch = new CountDownLatch(count + 1);
+    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
+    IntStream.range(0, count)
+      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
+        .thenRun(() -> putLatch.countDown()));
+    putLatch.await();
+
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    CountDownLatch deleteLatch = new CountDownLatch(count);
+
+    IntStream.range(0, count)
+      .forEach(i -> table.checkAndMutate(new CheckAndMutate(row)
+          .ifEquals(FAMILY, QUALIFIER, VALUE)
+          .action(
+            new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          deleteLatch.countDown();
+        }));
+    deleteLatch.await();
+    assertEquals(1, successCount.get());
+    Result result = table.get(new Get(row)).get();
+    IntStream.range(0, count).forEach(i -> {
+      if (i == successIndex.get()) {
+        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
+      } else {
+        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
+      }
+    });
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  @Test
+  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
+    AsyncTable<?> table = getTable.get();
+    int count = 10;
+    CountDownLatch putLatch = new CountDownLatch(count + 1);
+    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
+    IntStream.range(0, count)
+      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
+        .thenRun(() -> putLatch.countDown()));
+    putLatch.await();
+
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger successIndex = new AtomicInteger(-1);
+    CountDownLatch mutateLatch = new CountDownLatch(count);
+    IntStream.range(0, count).forEach(i -> {
+      RowMutations mutation = new RowMutations(row);
+      try {
+        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));

Review comment:
       No, we need the cast here, it's ugly though 😞
   
   `RowMutations` class has several overloaded `add` methods as follows:
   ```
   1. @Deprecated public void add(Put p) throws IOException 
   2. @Deprecated public void add(Delete d) throws IOException
   3. public RowMutations add(Mutation mutation) throws IOException
   4. public RowMutations add(List<? extends Mutation> mutations) throws IOException
   ```
   I want to use the 3rd method above here. However, without the cast to `Mutation`, the 1st method is selected. That's why we need the cast.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -2793,18 +2793,93 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
 
     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
 
-    // this will contain all the cells that we need to return. It's created later, if needed.
-    List<CellScannable> cellsToReturn = null;
     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
-    Boolean processed = null;
-    RegionScannersCloseCallBack closeCallBack = null;
-    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
     this.rpcMultiRequestCount.increment();
     this.requestCount.increment();
+    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
+
+    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
+    // following logic is for backward compatibility as old clients still use
+    // MultiRequest#condition in case of checkAndMutate with RowMutations.
+    if (request.hasCondition()) {
+      if (request.getRegionActionList().isEmpty()) {
+        // If the region action list is empty, do nothing.
+        responseBuilder.setProcessed(true);
+        return responseBuilder.build();
+      }
+
+      RegionAction regionAction = request.getRegionAction(0);
+
+      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
+      // we can assume regionAction.getAtomic() is true here.
+      assert regionAction.getAtomic();
+
+      OperationQuota quota;
+      HRegion region;
+      RegionSpecifier regionSpecifier = regionAction.getRegion();
+
+      try {
+        region = getRegion(regionSpecifier);
+        quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
+      } catch (IOException e) {
+        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
+        return responseBuilder.build();
+      }
+
+      boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
+      // We only allow replication in standby state and it will not set the atomic flag.
+      if (rejectIfFromClient) {
+        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
+          new DoNotRetryIOException(
+            region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
+        quota.close();
+        return responseBuilder.build();
+      }
+
+      try {
+        Condition condition = request.getCondition();
+        byte[] row = condition.getRow().toByteArray();
+        byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+        byte[] qualifier = condition.hasQualifier() ?
+          condition.getQualifier().toByteArray() : null;
+        CompareOperator op = condition.hasCompareType() ?
+          CompareOperator.valueOf(condition.getCompareType().name()) : null;
+        ByteArrayComparable comparator = condition.hasComparator() ?
+          ProtobufUtil.toComparator(condition.getComparator()) : null;
+        Filter filter = condition.hasFilter() ?
+          ProtobufUtil.toFilter(condition.getFilter()) : null;
+        TimeRange timeRange = condition.hasTimeRange() ?
+          ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+          TimeRange.allTime();
+        boolean processed =
+          checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
+            qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
+            spaceQuotaEnforcement);
+        responseBuilder.setProcessed(processed);
+      } catch (IOException e) {
+        rpcServer.getMetrics().exception(e);
+        // As it's an atomic operation with a condition, we may expect it's a global failure.
+        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
+      }
+
+      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+      quota.close();

Review comment:
       Yes, that's right. Thank you for pointing this out!

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -2827,33 +2904,104 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
           quota.close();
           continue;
         }
-        // How does this call happen?  It may need some work to play well w/ the surroundings.
-        // Need to return an item per Action along w/ Action index.  TODO.
+
         try {
-          if (request.hasCondition()) {
-            Condition condition = request.getCondition();
-            byte[] row = condition.getRow().toByteArray();
-            byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
-            byte[] qualifier = condition.hasQualifier() ?
-              condition.getQualifier().toByteArray() : null;
-            CompareOperator op = condition.hasCompareType() ?
-              CompareOperator.valueOf(condition.getCompareType().name()) : null;
-            ByteArrayComparable comparator = condition.hasComparator() ?
-              ProtobufUtil.toComparator(condition.getComparator()) : null;
-            Filter filter = condition.hasFilter() ?
-              ProtobufUtil.toFilter(condition.getFilter()) : null;
-            TimeRange timeRange = condition.hasTimeRange() ?
-              ProtobufUtil.toTimeRange(condition.getTimeRange()) :
-              TimeRange.allTime();
+          Condition condition = regionAction.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.hasFamily() ? condition.getFamily().toByteArray() : null;
+          byte[] qualifier = condition.hasQualifier() ?
+            condition.getQualifier().toByteArray() : null;
+          CompareOperator op = condition.hasCompareType() ?
+            CompareOperator.valueOf(condition.getCompareType().name()) : null;
+          ByteArrayComparable comparator = condition.hasComparator() ?
+            ProtobufUtil.toComparator(condition.getComparator()) : null;
+          Filter filter = condition.hasFilter() ?
+            ProtobufUtil.toFilter(condition.getFilter()) : null;
+          TimeRange timeRange = condition.hasTimeRange() ?
+            ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+            TimeRange.allTime();
+
+          boolean processed;
+          if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+            // RowMutations
             processed =
               checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
                 qualifier, op, comparator, filter, timeRange, regionActionResultBuilder,
                 spaceQuotaEnforcement);
           } else {
-            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
-              cellScanner, spaceQuotaEnforcement);
-            processed = Boolean.TRUE;
+            if (regionAction.getActionList().isEmpty()) {
+              // If the region action list is empty, do nothing.
+              regionActionResultBuilder.setProcessed(true);
+              continue;
+            }
+            Action action = regionAction.getAction(0);
+            if (action.hasGet()) {
+              throw new DoNotRetryIOException("CheckAndMutate doesn't support GET="
+                + action.getGet());
+            }
+            MutationProto mutation = action.getMutation();
+            switch (mutation.getMutateType()) {
+              case PUT:
+                Put put = ProtobufUtil.toPut(mutation, cellScanner);
+                checkCellSizeLimit(region, put);
+                // Throws an exception when violated
+                spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+                quota.addMutation(put);
+
+                if (filter != null) {
+                  processed = region.checkAndMutate(row, filter, timeRange, put);
+                } else {
+                  processed = region.checkAndMutate(row, family, qualifier, op, comparator,
+                    timeRange, put);
+                }
+                break;
+
+              case DELETE:
+                Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+                checkCellSizeLimit(region, delete);
+                spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
+                quota.addMutation(delete);
+
+                if (filter != null) {
+                  processed = region.checkAndMutate(row, filter, timeRange, delete);
+                } else {
+                  processed = region.checkAndMutate(row, family, qualifier, op, comparator,
+                    timeRange, delete);
+                }
+                break;
+
+              default:
+                throw new DoNotRetryIOException("CheckAndMutate doesn't support " +
+                  mutation.getMutateType());
+            }
+
+            // To unify the response format with doNonAtomicRegionMutation and read through
+            // client's AsyncProcess we have to add an empty result instance per operation
+            regionActionResultBuilder.addResultOrException(ClientProtos.ResultOrException
+              .newBuilder().setIndex(0).build());
           }

Review comment:
       Sure, I will do that. Thanks.

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
+ * and {@link RowMutations} are supported.
+ * <p>
+ * This has a fluent style API to instantiate it, the code is like:
+ * <pre>
+ * <code>
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row equals to the specified value
+ * CheckAndMutate checkAndMutate = new CheckAndMutate(row)
+ *   .ifEquals(family, qualifier, value)
+ *   .action(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the column (specified by the
+ * // family and the qualifier) of the row doesn't exist
+ * CheckAndMutate checkAndMutate = new CheckAndMutate(row)
+ *   .ifNotExists(family, qualifier)
+ *   .action(put);
+ *
+ * // A CheckAndMutate operation where do the specified action if the row matches the filter
+ * CheckAndMutate checkAndMutate = new CheckAndMutate(row)
+ *   .ifMatches(filter)
+ *   .action(delete);
+ * </code>
+ * </pre>
+ */
+@InterfaceAudience.Public
+public final class CheckAndMutate extends Mutation {
+
+  private byte[] family;
+  private byte[] qualifier;
+  private CompareOperator op;
+  private byte[] value;
+  private Filter filter;
+  private TimeRange timeRange;
+  private Row action;
+
+  public CheckAndMutate(byte[] row) {
+    super(row, HConstants.LATEST_TIMESTAMP, Collections.emptyNavigableMap());
+  }
+
+  /**
+   * Check for lack of column
+   *
+   * @param family family to check
+   * @param qualifier qualifier to check
+   * @return the CheckAndMutate object
+   */
+  public CheckAndMutate ifNotExists(byte[] family, byte[] qualifier) {
+    return ifEquals(family, qualifier, null);
+  }
+
+  /**
+   * Check for equality
+   *
+   * @param family family to check
+   * @param qualifier qualifier to check
+   * @param value the expected value
+   * @return the CheckAndMutate object
+   */
+  public CheckAndMutate ifEquals(byte[] family, byte[] qualifier, byte[] value) {
+    return ifMatches(family, qualifier, CompareOperator.EQUAL, value);
+  }
+
+  /**
+   * @param family family to check
+   * @param qualifier qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value the expected value
+   * @return the CheckAndMutate object
+   */
+  public CheckAndMutate ifMatches(byte[] family, byte[] qualifier, CompareOperator compareOp,
+    byte[] value) {
+    this.family = Preconditions.checkNotNull(family, "family is null");
+    this.qualifier = qualifier;
+    this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
+    this.value = value;
+    return this;
+  }
+
+  /**
+   * @param filter filter to check
+   * @return the CheckAndMutate object
+   */
+  public CheckAndMutate ifMatches(Filter filter) {
+    this.filter = Preconditions.checkNotNull(filter, "filter is null");
+    return this;
+  }
+
+  /**
+   * @param timeRange time range to check
+   * @return the CheckAndMutate object
+   */
+  public CheckAndMutate timeRange(TimeRange timeRange) {
+    this.timeRange = timeRange;
+    return this;
+  }
+
+  /**
+   * Currently only supports {@link Put}, {@link Delete} and {@link RowMutations}.
+   *
+   * @param action action to be performed if the specified condition matches
+   * @return the CheckAndMutate object
+   */
+  public CheckAndMutate action(Row action) {
+    this.action = Preconditions.checkNotNull(action, "action is null");

Review comment:
       I will make `Builder` class and it will resolve this. Thanks.

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java
##########
@@ -366,10 +348,585 @@ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
   }
 
   @Test(expected = NullPointerException.class)
-  public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
+  @Deprecated
+  public void testCheckAndMutateWithoutConditionForOldApi() throws Throwable {
     try (Table table = createTable()) {
       table.checkAndMutate(ROWKEY, FAMILY)
         .thenPut(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
     }
   }
+
+  // Tests for new checkAndMutate API
+
+  @Test
+  public void testCheckAndMutate() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // put the same row again with C column deleted
+      RowMutations rm = makeRowMutationsWithColumnCDeleted();
+      boolean res = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .action(rm));
+      assertTrue(res);
+
+      // get row back and assert the values
+      getOneRowAndAssertAllButCExist(table);
+
+      // Test that we get a region level exception
+      try {
+        rm = getBogusRowMutations();
+        table.checkAndMutate(new CheckAndMutate(ROWKEY)
+          .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+          .action(rm));
+        fail("Expected NoSuchColumnFamilyException");
+      } catch (NoSuchColumnFamilyException e) {
+        // expected
+      } catch (RetriesExhaustedException e) {
+        assertThat(e.getCause(), instanceOf(NoSuchColumnFamilyException.class));
+      }
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithSingleFilter() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // Put with success
+      boolean ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY,
+          Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+        CompareOperator.EQUAL, Bytes.toBytes("b")))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+      // Delete with success
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+          CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .action(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
+      assertTrue(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+      // Mutate with success
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
+          CompareOperator.EQUAL, Bytes.toBytes("b")))
+        .action(new RowMutations(ROWKEY)
+          .add((Mutation) new Put(ROWKEY)
+            .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+          .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
+      assertTrue(ok);
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
+    try (Table table = createTable()) {
+      // put one row
+      putOneRow(table);
+      // get row back and assert the values
+      getOneRowAndAssertAllExist(table);
+
+      // Put with success
+      boolean ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+            Bytes.toBytes("c"))))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("E"))));
+
+      // Delete with success
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new FilterList(
+            new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+              Bytes.toBytes("a")),
+            new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+              Bytes.toBytes("b"))))
+        .action(new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("D"))));
+      assertTrue(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"))));
+
+      // Mutate with success
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new FilterList(
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+            Bytes.toBytes("a")),
+          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
+            Bytes.toBytes("b"))))
+        .action(new RowMutations(ROWKEY)
+          .add((Mutation) new Put(ROWKEY)
+            .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
+          .add((Mutation) new Delete(ROWKEY).addColumns(FAMILY, Bytes.toBytes("A")))));
+      assertTrue(ok);
+
+      result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D")));
+      assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
+    try (Table table = createTable()) {
+      // Put with specifying the timestamp
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+      // Put with success
+      boolean ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new FilterList(
+          new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+          new TimestampsFilter(Collections.singletonList(100L))))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new FilterList(
+          new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
+          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
+          new TimestampsFilter(Collections.singletonList(101L))))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+    }
+  }
+
+  @Test
+  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
+    try (Table table = createTable()) {
+      // Put with specifying the timestamp
+      table.put(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
+
+      // Put with success
+      boolean ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY,
+          Bytes.toBytes("A"), CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .timeRange(TimeRange.between(0, 101))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))));
+      assertTrue(ok);
+
+      Result result = table.get(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("B")));
+      assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+      // Put with failure
+      ok = table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
+          CompareOperator.EQUAL, Bytes.toBytes("a")))
+        .timeRange(TimeRange.between(0, 100))
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))));
+      assertFalse(ok);
+
+      assertFalse(table.exists(new Get(ROWKEY).addColumn(FAMILY, Bytes.toBytes("C"))));
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckAndMutateWithoutAction() throws Throwable {
+    try (Table table = createTable()) {
+      table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCheckAndMutateWithoutCondition() throws Throwable {
+    try (Table table = createTable()) {
+      table.checkAndMutate(new CheckAndMutate(ROWKEY)
+        .action(new Put(ROWKEY).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
+    }
+  }

Review comment:
       Thanks! Making `Builder` class is a good idea. I will make this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org