You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/08/14 07:17:33 UTC

[GitHub] [hbase] brfrn169 commented on a change in pull request #2228: HBASE-24602 Add Increment and Append support to CheckAndMutate

brfrn169 commented on a change in pull request #2228:
URL: https://github.com/apache/hbase/pull/2228#discussion_r470435585



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results)
+      throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      Map<byte[], List<Cell>> ret = new HashMap<>();
+      // Process a Store/family at a time.
+      for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
+        final byte[] columnFamilyName = entry.getKey();
+        List<Cell> deltas = entry.getValue();
+        // Reckon for the Store what to apply to WAL and MemStore.
+        List<Cell> toApply = reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+          now, deltas, results);
+        if (!toApply.isEmpty()) {
+          for (Cell cell : toApply) {
+            HStore store = region.getStore(cell);
+            if (store == null) {
+              region.checkFamily(CellUtil.cloneFamily(cell));
+            } else {
+              ret.computeIfAbsent(store.getColumnFamilyDescriptor().getName(),
+                key -> new ArrayList<>()).add(cell);
+            }
+          }
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed
+     * column family/Store.
+     *
+     * Does Get of current value and then adds passed in deltas for this Store returning the
+     * result.
+     *
+     * @param mutation The encompassing Mutation object
+     * @param deltas Changes to apply to this Store; either increment amount or data to append
+     * @param results In here we accumulate all the Cells we are to return to the client. If null,
+     *   client doesn't want results returned.
+     * @return Resulting Cells after <code>deltas</code> have been applied to current
+     *   values. Side effect is our filling out of the <code>results</code> List.
+     */
+    private List<Cell> reckonDeltasByStore(HStore store, Mutation mutation, long now,
+      List<Cell> deltas, List<Cell> results) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
+      List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
+
+      // Get previous values for all columns in this family.
+      TimeRange tr;
+      if (mutation instanceof Increment) {
+        tr = ((Increment) mutation).getTimeRange();
+      } else {
+        tr = ((Append) mutation).getTimeRange();
+      }
+      List<Cell> currentValues = get(mutation, store, deltas, tr);
+
+      // Iterate the input columns and update existing values if they were found, otherwise
+      // add new column initialized to the delta amount
+      int currentValuesIndex = 0;
+      for (int i = 0; i < deltas.size(); i++) {
+        Cell delta = deltas.get(i);
+        Cell currentValue = null;
+        if (currentValuesIndex < currentValues.size() &&
+          CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
+          currentValue = currentValues.get(currentValuesIndex);
+          if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
+            currentValuesIndex++;
+          }
+        }
+        // Switch on whether this an increment or an append building the new Cell to apply.
+        Cell newCell;
+        if (mutation instanceof Increment) {
+          long deltaAmount = getLongValue(delta);
+          final long newValue = currentValue == null ?
+            deltaAmount : getLongValue(currentValue) + deltaAmount;
+          newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
+            (oldCell) -> Bytes.toBytes(newValue));
+        } else {

Review comment:
       These were also moving existing code. Will keep this. Thanks.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {

Review comment:
       Will change the name. This method is only for Increment and Append operations. It's called in the following if statement only:
   https://github.com/apache/hbase/pull/2228/files#diff-6205e907851ed4f650499f7111cbd91cR3799

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -742,36 +741,35 @@ private Result increment(final HRegion region, final OperationQuota quota,
     spaceQuota.getPolicyEnforcement(region).check(increment);
     quota.addMutation(increment);
     Result r = null;
-    if (region.getCoprocessorHost() != null) {
-      r = region.getCoprocessorHost().preIncrement(increment);
-    }
-    if (r == null) {
-      boolean canProceed = startNonceOperation(mutation, nonceGroup);
-      boolean success = false;
-      try {
-        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
-        if (canProceed) {
-          r = region.increment(increment, nonceGroup, nonce);
-        } else {
+    boolean canProceed = startNonceOperation(mutation, nonceGroup);

Review comment:
       Yes, I changed the semantics. This is because we call `preIncrement` in `region.increment()` after this change. I don't think this change breaks anything, but it just changes the order of `preIncrement` and the nonce operation. What do you think?

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
##########
@@ -1282,6 +1438,80 @@ public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
     assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
   }
 
+  @Test
+  public void testCheckAndIncrementBatch() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
+
+    // CheckAndIncrement with correct value
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
+
+    // CheckAndIncrement with wrong value
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+      .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
+
+    List<CheckAndMutateResult> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();

Review comment:
       No for now. I'm going to handle that case in [HBASE-24210](https://issues.apache.org/jira/browse/HBASE-24210).

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -742,36 +741,35 @@ private Result increment(final HRegion region, final OperationQuota quota,
     spaceQuota.getPolicyEnforcement(region).check(increment);
     quota.addMutation(increment);
     Result r = null;
-    if (region.getCoprocessorHost() != null) {
-      r = region.getCoprocessorHost().preIncrement(increment);
-    }
-    if (r == null) {
-      boolean canProceed = startNonceOperation(mutation, nonceGroup);
-      boolean success = false;
-      try {
-        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
-        if (canProceed) {
-          r = region.increment(increment, nonceGroup, nonce);
-        } else {
+    boolean canProceed = startNonceOperation(mutation, nonceGroup);

Review comment:
       Yes, I've changed the semantics. This is because we call `preIncrement` in `region.increment()` after this change. I don't think this change breaks anything, but it just changes the order of `preIncrement` and the nonce operation. What do you think?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -2910,23 +2908,35 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
           }
 
           try {
-            CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
-              cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
-            regionActionResultBuilder.setProcessed(result.isSuccess());
             ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
               ClientProtos.ResultOrException.newBuilder();
-            for (int i = 0; i < regionAction.getActionCount(); i++) {
-              if (i == 0 && result.getResult() != null) {
-                resultOrExceptionOrBuilder.setIndex(i);
-                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
-                  .setResult(ProtobufUtil.toResult(result.getResult())).build());
-                continue;
+            if (regionAction.getActionCount() == 1) {
+              CheckAndMutateResult result = checkAndMutate(region, quota,
+                regionAction.getAction(0).getMutation(), cellScanner,
+                regionAction.getCondition(), spaceQuotaEnforcement);
+              regionActionResultBuilder.setProcessed(result.isSuccess());
+              resultOrExceptionOrBuilder.setIndex(0);
+              if (result.getResult() != null) {
+                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));

Review comment:
       Actually, this change supports a CheckAndMutate operation only with a single Increment/Append. That's why I needed the if statement to check whether a single action or not. BTW, I'm going to handle CheckAndMutate operations with multiple Increments/Appends in [HBASE-24210](https://issues.apache.org/jira/browse/HBASE-24210).

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results)
+      throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      Map<byte[], List<Cell>> ret = new HashMap<>();
+      // Process a Store/family at a time.
+      for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
+        final byte[] columnFamilyName = entry.getKey();
+        List<Cell> deltas = entry.getValue();
+        // Reckon for the Store what to apply to WAL and MemStore.
+        List<Cell> toApply = reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+          now, deltas, results);
+        if (!toApply.isEmpty()) {
+          for (Cell cell : toApply) {

Review comment:
       These were also moving existing code. Will keep this. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org