You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2021/03/30 14:04:57 UTC

[hbase] branch branch-2 updated: HBASE-25703 Support conditional update in MultiRowMutationEndpoint (#3107)

This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 1d3ea38  HBASE-25703 Support conditional update in MultiRowMutationEndpoint (#3107)
1d3ea38 is described below

commit 1d3ea38f1e2847c055fa964e5df2c039b3b756dd
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Tue Mar 30 23:04:29 2021 +0900

    HBASE-25703 Support conditional update in MultiRowMutationEndpoint (#3107)
    
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../apache/hadoop/hbase/protobuf/ProtobufUtil.java |  52 ++++
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  35 +++
 .../hbase/shaded/protobuf/RequestConverter.java    |  45 +--
 .../src/main/protobuf/Client.proto                 |   4 +-
 .../src/main/protobuf/MultiRowMutation.proto       |   1 +
 .../coprocessor/MultiRowMutationEndpoint.java      | 176 ++++++++++-
 .../hadoop/hbase/client/TestFromClientSide5.java   | 329 ++++++++++++++++++++-
 .../hbase/client/TestMalformedCellFromClient.java  |   4 +-
 8 files changed, 579 insertions(+), 67 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5bba1e1..dc7f7d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ExtendedCellBuilder;
 import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.SnapshotType;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -751,6 +753,9 @@ public final class ProtobufUtil {
    */
   public static Mutation toMutation(final MutationProto proto) throws IOException {
     MutationType type = proto.getMutateType();
+    if (type == MutationType.INCREMENT) {
+      return toIncrement(proto, null);
+    }
     if (type == MutationType.APPEND) {
       return toAppend(proto, null);
     }
@@ -1790,4 +1795,51 @@ public final class ProtobufUtil {
       .setTo(timeRange.getMax())
       .build();
   }
+
+  public static TimeRange toTimeRange(HBaseProtos.TimeRange timeRange) {
+    if (timeRange == null) {
+      return TimeRange.allTime();
+    }
+    if (timeRange.hasFrom()) {
+      if (timeRange.hasTo()) {
+        return TimeRange.between(timeRange.getFrom(), timeRange.getTo());
+      } else {
+        return TimeRange.from(timeRange.getFrom());
+      }
+    } else {
+      return TimeRange.until(timeRange.getTo());
+    }
+  }
+
+  public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+    final TimeRange timeRange) throws IOException {
+
+    ClientProtos.Condition.Builder builder = ClientProtos.Condition.newBuilder()
+      .setRow(ByteStringer.wrap(row));
+
+    if (filter != null) {
+      builder.setFilter(ProtobufUtil.toFilter(filter));
+    } else {
+      builder.setFamily(ByteStringer.wrap(family))
+        .setQualifier(ByteStringer.wrap(
+          qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
+        .setComparator(
+          ProtobufUtil.toComparator(new BinaryComparator(value)))
+        .setCompareType(HBaseProtos.CompareType.valueOf(op.name()));
+    }
+
+    return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
+  }
+
+  public static ClientProtos.Condition toCondition(final byte[] row, final Filter filter,
+    final TimeRange timeRange) throws IOException {
+    return toCondition(row, null, null, null, null, filter, timeRange);
+  }
+
+  public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+    final byte[] qualifier, final CompareOperator op, final byte[] value,
+    final TimeRange timeRange) throws IOException {
+    return toCondition(row, family, qualifier, op, value, null, timeRange);
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 9064ded..cb471d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -917,6 +918,9 @@ public final class ProtobufUtil {
    */
   public static Mutation toMutation(final MutationProto proto) throws IOException {
     MutationType type = proto.getMutateType();
+    if (type == MutationType.INCREMENT) {
+      return toIncrement(proto, null);
+    }
     if (type == MutationType.APPEND) {
       return toAppend(proto, null);
     }
@@ -3632,6 +3636,37 @@ public final class ProtobufUtil {
     }
   }
 
+  public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+    final TimeRange timeRange) throws IOException {
+
+    ClientProtos.Condition.Builder builder = ClientProtos.Condition.newBuilder()
+      .setRow(UnsafeByteOperations.unsafeWrap(row));
+
+    if (filter != null) {
+      builder.setFilter(ProtobufUtil.toFilter(filter));
+    } else {
+      builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
+        .setQualifier(UnsafeByteOperations.unsafeWrap(
+          qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
+        .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
+        .setCompareType(HBaseProtos.CompareType.valueOf(op.name()));
+    }
+
+    return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
+  }
+
+  public static ClientProtos.Condition toCondition(final byte[] row, final Filter filter,
+    final TimeRange timeRange) throws IOException {
+    return toCondition(row, null, null, null, null, filter, timeRange);
+  }
+
+  public static ClientProtos.Condition toCondition(final byte[] row, final byte[] family,
+    final byte[] qualifier, final CompareOperator op, final byte[] value,
+    final TimeRange timeRange) throws IOException {
+    return toCondition(row, family, qualifier, op, value, null, timeRange);
+  }
+
   public static List<LogEntry> toBalancerDecisionResponse(
       HBaseProtos.LogEntry logEntry) {
     try {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 21644f4..bdae13b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.master.RegionState;
@@ -102,7 +101,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationPr
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
@@ -257,7 +255,7 @@ public final class RequestConverter {
       builder.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation));
     }
     return builder.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
-      .setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
+      .setCondition(ProtobufUtil.toCondition(row, family, qualifier, op, value, filter, timeRange))
       .build();
   }
 
@@ -271,8 +269,8 @@ public final class RequestConverter {
     final byte[] row, final byte[] family, final byte[] qualifier,
     final CompareOperator op, final byte[] value, final Filter filter, final TimeRange timeRange,
     final RowMutations rowMutations, long nonceGroup, long nonce) throws IOException {
-    return buildMultiRequest(regionName, rowMutations, buildCondition(row, family, qualifier, op,
-      value, filter, timeRange), nonceGroup, nonce);
+    return buildMultiRequest(regionName, rowMutations, ProtobufUtil.toCondition(row, family,
+      qualifier, op, value, filter, timeRange), nonceGroup, nonce);
   }
 
   /**
@@ -666,8 +664,9 @@ public final class RequestConverter {
       getRegionActionBuilderWithRegion(builder, regionName);
 
       CheckAndMutate cam = (CheckAndMutate) action.getAction();
-      builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
-        cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
+      builder.setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(),
+        cam.getQualifier(), cam.getCompareOp(), cam.getValue(), cam.getFilter(),
+        cam.getTimeRange()));
 
       if (cam.getAction() instanceof Put) {
         actionBuilder.clear();
@@ -832,8 +831,9 @@ public final class RequestConverter {
       getRegionActionBuilderWithRegion(builder, regionName);
 
       CheckAndMutate cam = (CheckAndMutate) action.getAction();
-      builder.setCondition(buildCondition(cam.getRow(), cam.getFamily(), cam.getQualifier(),
-        cam.getCompareOp(), cam.getValue(), cam.getFilter(), cam.getTimeRange()));
+      builder.setCondition(ProtobufUtil.toCondition(cam.getRow(), cam.getFamily(),
+        cam.getQualifier(), cam.getCompareOp(), cam.getValue(), cam.getFilter(),
+        cam.getTimeRange()));
 
       if (cam.getAction() instanceof Put) {
         actionBuilder.clear();
@@ -1188,31 +1188,6 @@ public final class RequestConverter {
   }
 
   /**
-   * Create a protocol buffer Condition
-   *
-   * @return a Condition
-   * @throws IOException
-   */
-  public static Condition buildCondition(final byte[] row, final byte[] family,
-    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
-    final TimeRange timeRange) throws IOException {
-
-    Condition.Builder builder = Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row));
-
-    if (filter != null) {
-      builder.setFilter(ProtobufUtil.toFilter(filter));
-    } else {
-      builder.setFamily(UnsafeByteOperations.unsafeWrap(family))
-        .setQualifier(UnsafeByteOperations.unsafeWrap(
-          qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier))
-        .setComparator(ProtobufUtil.toComparator(new BinaryComparator(value)))
-        .setCompareType(CompareType.valueOf(op.name()));
-    }
-
-    return builder.setTimeRange(ProtobufUtil.toTimeRange(timeRange)).build();
-  }
-
-  /**
    * Create a protocol buffer AddColumnRequest
    *
    * @param tableName
@@ -1245,7 +1220,7 @@ public final class RequestConverter {
       final long nonceGroup,
       final long nonce) {
     DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder();
-    builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
+    builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
     builder.setColumnName(UnsafeByteOperations.unsafeWrap(columnName));
     builder.setNonceGroup(nonceGroup);
     builder.setNonce(nonce);
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 1b8b986..13917b6 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -132,8 +132,8 @@ message GetResponse {
 }
 
 /**
- * Condition to check if the value of a given cell (row,
- * family, qualifier) matches a value via a given comparator.
+ * Condition to check if the value of a given cell (row, family, qualifier) matches a value via a
+ * given comparator or the value of a given cell matches a given filter.
  *
  * Condition is used in check and mutate operations.
  */
diff --git a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
index d3140e9..571e633 100644
--- a/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
+++ b/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
@@ -37,6 +37,7 @@ message MutateRowsRequest {
   optional uint64 nonce_group = 2;
   optional uint64 nonce = 3;
   optional RegionSpecifier region = 4;
+  repeated Condition condition = 5;
 }
 
 message MutateRowsResponse {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
index d230773..c840d54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
@@ -25,31 +25,43 @@ import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.WrongRegionException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
 
 /**
- * This class demonstrates how to implement atomic multi row transactions using
- * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)}
- * and Coprocessor endpoints.
+ * This class implements atomic multi row transactions using
+ * {@link HRegion#mutateRowsWithLocks(Collection, Collection, long, long)} and Coprocessor
+ * endpoints. We can also specify some conditions to perform conditional update.
  *
  * Defines a protocol to perform multi row transactions.
  * See {@link MultiRowMutationEndpoint} for the implementation.
@@ -60,18 +72,29 @@ import com.google.protobuf.Service;
  * <br>
  * Example:
  * <code>
- * List&lt;Mutation&gt; mutations = ...;
- * Put p1 = new Put(row1);
- * Put p2 = new Put(row2);
+ * Put p = new Put(row1);
+ * Delete d = new Delete(row2);
+ * Increment i = new Increment(row3);
+ * Append a = new Append(row4);
  * ...
- * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1);
- * Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2);
+ * Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
+ * Mutate m2 = ProtobufUtil.toMutate(MutateType.DELETE, d);
+ * Mutate m3 = ProtobufUtil.toMutate(MutateType.INCREMENT, i);
+ * Mutate m4 = ProtobufUtil.toMutate(MutateType.Append, a);
+ *
  * MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
  * mrmBuilder.addMutationRequest(m1);
  * mrmBuilder.addMutationRequest(m2);
+ * mrmBuilder.addMutationRequest(m3);
+ * mrmBuilder.addMutationRequest(m4);
+ *
+ * // We can also specify conditions to preform conditional update
+ * mrmBuilder.addCondition(ProtobufUtil.toCondition(row, FAMILY, QUALIFIER,
+ *  CompareOperator.EQUAL, value, TimeRange.allTime()));
+ *
  * CoprocessorRpcChannel channel = t.coprocessorService(ROW);
  * MultiRowMutationService.BlockingInterface service =
- *    MultiRowMutationService.newBlockingStub(channel);
+ *   MultiRowMutationService.newBlockingStub(channel);
  * MutateRowsRequest mrm = mrmBuilder.build();
  * service.mutateRows(null, mrm);
  * </code>
@@ -79,11 +102,16 @@ import com.google.protobuf.Service;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 @InterfaceStability.Evolving
 public class MultiRowMutationEndpoint extends MultiRowMutationService implements RegionCoprocessor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HRegion.class);
+
   private RegionCoprocessorEnvironment env;
+
   @Override
   public void mutateRows(RpcController controller, MutateRowsRequest request,
       RpcCallback<MutateRowsResponse> done) {
     MutateRowsResponse response = MutateRowsResponse.getDefaultInstance();
+
+    List<Region.RowLock> rowLocks = null;
     try {
       // set of rows to lock, sorted to avoid deadlocks
       SortedSet<byte[]> rowsToLock = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@@ -93,7 +121,9 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
         mutations.add(ProtobufUtil.toMutation(m));
       }
 
-      RegionInfo regionInfo = env.getRegion().getRegionInfo();
+      Region region = env.getRegion();
+
+      RegionInfo regionInfo = region.getRegionInfo();
       for (Mutation m : mutations) {
         // check whether rows are in range for this region
         if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
@@ -110,16 +140,134 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
         }
         rowsToLock.add(m.getRow());
       }
-      // call utility method on region
-      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
-      long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
-      env.getRegion().mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
+
+      boolean matches = true;
+      if (request.getConditionCount() > 0) {
+        // Get row locks for the mutations and the conditions
+        rowLocks = new ArrayList<>();
+        for (ClientProtos.Condition condition : request.getConditionList()) {
+          rowsToLock.add(condition.getRow().toByteArray());
+        }
+        for (byte[] row : rowsToLock) {
+          try {
+            Region.RowLock rowLock = region.getRowLock(row, false); // write lock
+            rowLocks.add(rowLock);
+          } catch (IOException ioe) {
+            LOGGER.warn("Failed getting lock, row={}, in region {}", Bytes.toStringBinary(row),
+              this, ioe);
+            throw ioe;
+          }
+        }
+
+        // Check if all the conditions match
+        for (ClientProtos.Condition condition : request.getConditionList()) {
+          if (!matches(region, condition)) {
+            matches = false;
+            break;
+          }
+        }
+      }
+
+      if (matches) {
+        // call utility method on region
+        long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
+        long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
+        region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
+      }
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
+    } finally {
+      if (rowLocks != null) {
+        // Release the acquired row locks
+        for (Region.RowLock rowLock : rowLocks) {
+          rowLock.release();
+        }
+      }
     }
     done.run(response);
   }
 
+  private boolean matches(Region region, ClientProtos.Condition condition) throws IOException {
+    byte[] row = condition.getRow().toByteArray();
+
+    Filter filter = null;
+    byte[] family = null;
+    byte[] qualifier = null;
+    CompareOperator op = null;
+    ByteArrayComparable comparator = null;
+
+    if (condition.hasFilter()) {
+      filter = ProtobufUtil.toFilter(condition.getFilter());
+    } else {
+      family = condition.getFamily().toByteArray();
+      qualifier = condition.getQualifier().toByteArray();
+      op = CompareOperator.valueOf(condition.getCompareType().name());
+      comparator = ProtobufUtil.toComparator(condition.getComparator());
+    }
+
+    TimeRange timeRange = condition.hasTimeRange() ?
+      ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime();
+
+    Get get = new Get(row);
+    if (family != null) {
+      checkFamily(region, family);
+      get.addColumn(family, qualifier);
+    }
+    if (filter != null) {
+      get.setFilter(filter);
+    }
+    if (timeRange != null) {
+      get.setTimeRange(timeRange.getMin(), timeRange.getMax());
+    }
+
+    List<Cell> result = region.get(get, false);
+    boolean matches = false;
+    if (filter != null) {
+      if (!result.isEmpty()) {
+        matches = true;
+      }
+    } else {
+      boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
+      if (result.isEmpty() && valueIsNull) {
+        matches = true;
+      } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
+        matches = true;
+      } else if (result.size() == 1 && !valueIsNull) {
+        Cell kv = result.get(0);
+        int compareResult = PrivateCellUtil.compareValue(kv, comparator);
+        matches = matches(op, compareResult);
+      }
+    }
+    return matches;
+  }
+
+  private void checkFamily(Region region, byte[] family) throws NoSuchColumnFamilyException {
+    if (!region.getTableDescriptor().hasColumnFamily(family)) {
+      throw new NoSuchColumnFamilyException(
+        "Column family " + Bytes.toString(family) + " does not exist in region " + this
+          + " in table " + region.getTableDescriptor());
+    }
+  }
+
+  private boolean matches(CompareOperator op, int compareResult) {
+    switch (op) {
+      case LESS:
+        return compareResult < 0;
+      case LESS_OR_EQUAL:
+        return compareResult <= 0;
+      case EQUAL:
+        return compareResult == 0;
+      case NOT_EQUAL:
+        return compareResult != 0;
+      case GREATER_OR_EQUAL:
+        return compareResult >= 0;
+      case GREATER:
+        return compareResult > 0;
+      default:
+        throw new RuntimeException("Unknown Compare op " + op.name());
+    }
+  }
+
   @Override
   public Iterable<Service> getServices() {
     return Collections.singleton(this);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
index 19256fd..809fd2a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide5.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.SubstringComparator;
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.io.TimeRange;
@@ -265,30 +266,330 @@ public class TestFromClientSide5 extends FromClientSideBase {
     LOG.info("Starting testMultiRowMutation");
     final TableName tableName = name.getTableName();
     final byte [] ROW1 = Bytes.toBytes("testRow1");
+    final byte [] ROW2 = Bytes.toBytes("testRow2");
+    final byte [] ROW3 = Bytes.toBytes("testRow3");
 
     try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
-      Put p = new Put(ROW);
-      p.addColumn(FAMILY, QUALIFIER, VALUE);
-      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
+      // Add initial data
+      t.batch(Arrays.asList(
+        new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE),
+        new Put(ROW2).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(1L)),
+        new Put(ROW3).addColumn(FAMILY, QUALIFIER, VALUE)
+      ), new Object[3]);
 
-      p = new Put(ROW1);
-      p.addColumn(FAMILY, QUALIFIER, VALUE);
-      MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, p);
+      // Execute MultiRowMutation
+      Put put = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put);
+
+      Delete delete = new Delete(ROW1);
+      MutationProto m2 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+      Increment increment = new Increment(ROW2).addColumn(FAMILY, QUALIFIER, 1L);
+      MutationProto m3 = ProtobufUtil.toMutation(MutationType.INCREMENT, increment);
+
+      Append append = new Append(ROW3).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m4 = ProtobufUtil.toMutation(MutationType.APPEND, append);
 
       MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
       mrmBuilder.addMutationRequest(m1);
       mrmBuilder.addMutationRequest(m2);
-      MutateRowsRequest mrm = mrmBuilder.build();
+      mrmBuilder.addMutationRequest(m3);
+      mrmBuilder.addMutationRequest(m4);
+
       CoprocessorRpcChannel channel = t.coprocessorService(ROW);
       MultiRowMutationService.BlockingInterface service =
               MultiRowMutationService.newBlockingStub(channel);
-      service.mutateRows(null, mrm);
-      Get g = new Get(ROW);
-      Result r = t.get(g);
-      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
-      g = new Get(ROW1);
-      r = t.get(g);
-      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
+      service.mutateRows(null, mrmBuilder.build());
+
+      // Assert
+      Result r = t.get(new Get(ROW));
+      assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW1));
+      assertTrue(r.isEmpty());
+
+      r = t.get(new Get(ROW2));
+      assertEquals(2L, Bytes.toLong(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW3));
+      assertEquals(Bytes.toString(VALUE) + Bytes.toString(VALUE),
+        Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+    }
+  }
+
+  @Test
+  public void testMultiRowMutationWithSingleConditionWhenConditionMatches() throws Exception {
+    final TableName tableName = name.getTableName();
+    final byte [] ROW1 = Bytes.toBytes("testRow1");
+    final byte [] ROW2 = Bytes.toBytes("testRow2");
+    final byte [] VALUE1 = Bytes.toBytes("testValue1");
+    final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+      // Add initial data
+      t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+      // Execute MultiRowMutation with conditions
+      Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+      Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+      MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+      Delete delete = new Delete(ROW2);
+      MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+      MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+      mrmBuilder.addMutationRequest(m1);
+      mrmBuilder.addMutationRequest(m2);
+      mrmBuilder.addMutationRequest(m3);
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+        CompareOperator.EQUAL, VALUE2, null));
+
+      CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+      MultiRowMutationService.BlockingInterface service =
+        MultiRowMutationService.newBlockingStub(channel);
+      service.mutateRows(null, mrmBuilder.build());
+
+      // Assert
+      Result r = t.get(new Get(ROW));
+      assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW1));
+      assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW2));
+      assertTrue(r.isEmpty());
+    }
+  }
+
+  @Test
+  public void testMultiRowMutationWithSingleConditionWhenConditionNotMatch() throws Exception {
+    final TableName tableName = name.getTableName();
+    final byte [] ROW1 = Bytes.toBytes("testRow1");
+    final byte [] ROW2 = Bytes.toBytes("testRow2");
+    final byte [] VALUE1 = Bytes.toBytes("testValue1");
+    final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+      // Add initial data
+      t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+      // Execute MultiRowMutation with conditions
+      Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+      Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+      MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+      Delete delete = new Delete(ROW2);
+      MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+      MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+      mrmBuilder.addMutationRequest(m1);
+      mrmBuilder.addMutationRequest(m2);
+      mrmBuilder.addMutationRequest(m3);
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+        CompareOperator.EQUAL, VALUE1, null));
+
+      CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+      MultiRowMutationService.BlockingInterface service =
+        MultiRowMutationService.newBlockingStub(channel);
+      service.mutateRows(null, mrmBuilder.build());
+
+      // Assert
+      Result r = t.get(new Get(ROW));
+      assertTrue(r.isEmpty());
+
+      r = t.get(new Get(ROW1));
+      assertTrue(r.isEmpty());
+
+      r = t.get(new Get(ROW2));
+      assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+    }
+  }
+
+  @Test
+  public void testMultiRowMutationWithMultipleConditionsWhenConditionsMatch() throws Exception {
+    final TableName tableName = name.getTableName();
+    final byte [] ROW1 = Bytes.toBytes("testRow1");
+    final byte [] ROW2 = Bytes.toBytes("testRow2");
+    final byte [] VALUE1 = Bytes.toBytes("testValue1");
+    final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+      // Add initial data
+      t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+      // Execute MultiRowMutation with conditions
+      Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+      Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+      MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+      Delete delete = new Delete(ROW2);
+      MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+      MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+      mrmBuilder.addMutationRequest(m1);
+      mrmBuilder.addMutationRequest(m2);
+      mrmBuilder.addMutationRequest(m3);
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW, FAMILY, QUALIFIER,
+        CompareOperator.EQUAL, null, null));
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+        CompareOperator.EQUAL, VALUE2, null));
+
+      CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+      MultiRowMutationService.BlockingInterface service =
+        MultiRowMutationService.newBlockingStub(channel);
+      service.mutateRows(null, mrmBuilder.build());
+
+      // Assert
+      Result r = t.get(new Get(ROW));
+      assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW1));
+      assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW2));
+      assertTrue(r.isEmpty());
+    }
+  }
+
+  @Test
+  public void testMultiRowMutationWithMultipleConditionsWhenConditionsNotMatch() throws Exception {
+    final TableName tableName = name.getTableName();
+    final byte [] ROW1 = Bytes.toBytes("testRow1");
+    final byte [] ROW2 = Bytes.toBytes("testRow2");
+    final byte [] VALUE1 = Bytes.toBytes("testValue1");
+    final byte [] VALUE2 = Bytes.toBytes("testValue2");
+
+    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+      // Add initial data
+      t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2));
+
+      // Execute MultiRowMutation with conditions
+      Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+      Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+      MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+      Delete delete = new Delete(ROW2);
+      MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+      MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+      mrmBuilder.addMutationRequest(m1);
+      mrmBuilder.addMutationRequest(m2);
+      mrmBuilder.addMutationRequest(m3);
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW1, FAMILY, QUALIFIER,
+        CompareOperator.EQUAL, null, null));
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, FAMILY, QUALIFIER,
+        CompareOperator.EQUAL, VALUE1, null));
+
+      CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+      MultiRowMutationService.BlockingInterface service =
+        MultiRowMutationService.newBlockingStub(channel);
+      service.mutateRows(null, mrmBuilder.build());
+
+      // Assert
+      Result r = t.get(new Get(ROW));
+      assertTrue(r.isEmpty());
+
+      r = t.get(new Get(ROW1));
+      assertTrue(r.isEmpty());
+
+      r = t.get(new Get(ROW2));
+      assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+    }
+  }
+
+  @Test
+  public void testMultiRowMutationWithFilterConditionWhenConditionMatches() throws Exception {
+    final TableName tableName = name.getTableName();
+    final byte [] ROW1 = Bytes.toBytes("testRow1");
+    final byte [] ROW2 = Bytes.toBytes("testRow2");
+    final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2");
+    final byte [] VALUE1 = Bytes.toBytes("testValue1");
+    final byte [] VALUE2 = Bytes.toBytes("testValue2");
+    final byte [] VALUE3 = Bytes.toBytes("testValue3");
+
+    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+      // Add initial data
+      t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)
+        .addColumn(FAMILY, QUALIFIER2, VALUE3));
+
+      // Execute MultiRowMutation with conditions
+      Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+      Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+      MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+      Delete delete = new Delete(ROW2);
+      MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+      MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+      mrmBuilder.addMutationRequest(m1);
+      mrmBuilder.addMutationRequest(m2);
+      mrmBuilder.addMutationRequest(m3);
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList(
+        new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2),
+        new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE3)), null));
+
+      CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+      MultiRowMutationService.BlockingInterface service =
+        MultiRowMutationService.newBlockingStub(channel);
+      service.mutateRows(null, mrmBuilder.build());
+
+      // Assert
+      Result r = t.get(new Get(ROW));
+      assertEquals(Bytes.toString(VALUE), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW1));
+      assertEquals(Bytes.toString(VALUE1), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
+
+      r = t.get(new Get(ROW2));
+      assertTrue(r.isEmpty());
+    }
+  }
+
+  @Test
+  public void testMultiRowMutationWithFilterConditionWhenConditionNotMatch() throws Exception {
+    final TableName tableName = name.getTableName();
+    final byte [] ROW1 = Bytes.toBytes("testRow1");
+    final byte [] ROW2 = Bytes.toBytes("testRow2");
+    final byte [] QUALIFIER2 = Bytes.toBytes("testQualifier2");
+    final byte [] VALUE1 = Bytes.toBytes("testValue1");
+    final byte [] VALUE2 = Bytes.toBytes("testValue2");
+    final byte [] VALUE3 = Bytes.toBytes("testValue3");
+
+    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
+      // Add initial data
+      t.put(new Put(ROW2).addColumn(FAMILY, QUALIFIER, VALUE2)
+        .addColumn(FAMILY, QUALIFIER2, VALUE3));
+
+      // Execute MultiRowMutation with conditions
+      Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
+      MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, put1);
+      Put put2 = new Put(ROW1).addColumn(FAMILY, QUALIFIER, VALUE1);
+      MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, put2);
+      Delete delete = new Delete(ROW2);
+      MutationProto m3 = ProtobufUtil.toMutation(MutationType.DELETE, delete);
+
+      MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+      mrmBuilder.addMutationRequest(m1);
+      mrmBuilder.addMutationRequest(m2);
+      mrmBuilder.addMutationRequest(m3);
+      mrmBuilder.addCondition(ProtobufUtil.toCondition(ROW2, new FilterList(
+        new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOperator.EQUAL, VALUE2),
+        new SingleColumnValueFilter(FAMILY, QUALIFIER2, CompareOperator.EQUAL, VALUE2)), null));
+
+      CoprocessorRpcChannel channel = t.coprocessorService(ROW);
+      MultiRowMutationService.BlockingInterface service =
+        MultiRowMutationService.newBlockingStub(channel);
+      service.mutateRows(null, mrmBuilder.build());
+
+      // Assert
+      Result r = t.get(new Get(ROW));
+      assertTrue(r.isEmpty());
+
+      r = t.get(new Get(ROW1));
+      assertTrue(r.isEmpty());
+
+      r = t.get(new Get(ROW2));
+      assertEquals(Bytes.toString(VALUE2), Bytes.toString(r.getValue(FAMILY, QUALIFIER)));
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index c23bc7d..7816110 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -237,8 +237,8 @@ public class TestMalformedCellFromClient {
     builder.setAtomic(true);
     ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
     ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
-    ClientProtos.Condition condition = RequestConverter
-      .buildCondition(rm.getRow(), FAMILY, null, CompareOperator.EQUAL, new byte[10], null, null);
+    ClientProtos.Condition condition = ProtobufUtil.toCondition(rm.getRow(), FAMILY, null,
+      CompareOperator.EQUAL, new byte[10], null, null);
     for (Mutation mutation : rm.getMutations()) {
       ClientProtos.MutationProto.MutationType mutateType = null;
       if (mutation instanceof Put) {