You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/08/12 16:14:00 UTC

[GitHub] [hbase] joshelser commented on a change in pull request #2228: HBASE-24602 Add Increment and Append support to CheckAndMutate

joshelser commented on a change in pull request #2228:
URL: https://github.com/apache/hbase/pull/2228#discussion_r469353813



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.

Review comment:
       nit: preferred

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {

Review comment:
       Maybe `doCoprocessorPreCallAfterRowLock()` and indicate that this method is a no-op for Mutations which do not have a `pre*AfterRowLock()` method in the javadoc?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results)

Review comment:
       What about `compute` or `calculate` instead of `reckon`? I had to go to a dictionary :)

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results)
+      throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      Map<byte[], List<Cell>> ret = new HashMap<>();
+      // Process a Store/family at a time.
+      for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
+        final byte[] columnFamilyName = entry.getKey();
+        List<Cell> deltas = entry.getValue();
+        // Reckon for the Store what to apply to WAL and MemStore.
+        List<Cell> toApply = reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+          now, deltas, results);
+        if (!toApply.isEmpty()) {
+          for (Cell cell : toApply) {

Review comment:
       Genuine question, will this save us anything? Not sure how the JIT will (or won't) optimize such a thing away. I guess, at a minimum, it would save construction of an Iterator object?

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
##########
@@ -1282,6 +1438,80 @@ public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
     assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
   }
 
+  @Test
+  public void testCheckAndIncrementBatch() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
+
+    // CheckAndIncrement with correct value
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
+
+    // CheckAndIncrement with wrong value
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+      .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
+
+    List<CheckAndMutateResult> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();

Review comment:
       Can I send multiple CheckAndMutate's (each with their own Increment or Append) to the same row in one batch?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results)

Review comment:
       I see now that these were moving existing code. Better to keep the naming.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results)
+      throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      Map<byte[], List<Cell>> ret = new HashMap<>();
+      // Process a Store/family at a time.
+      for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
+        final byte[] columnFamilyName = entry.getKey();
+        List<Cell> deltas = entry.getValue();
+        // Reckon for the Store what to apply to WAL and MemStore.
+        List<Cell> toApply = reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+          now, deltas, results);
+        if (!toApply.isEmpty()) {
+          for (Cell cell : toApply) {
+            HStore store = region.getStore(cell);
+            if (store == null) {
+              region.checkFamily(CellUtil.cloneFamily(cell));
+            } else {
+              ret.computeIfAbsent(store.getColumnFamilyDescriptor().getName(),
+                key -> new ArrayList<>()).add(cell);
+            }
+          }
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed
+     * column family/Store.
+     *
+     * Does Get of current value and then adds passed in deltas for this Store returning the
+     * result.
+     *
+     * @param mutation The encompassing Mutation object
+     * @param deltas Changes to apply to this Store; either increment amount or data to append
+     * @param results In here we accumulate all the Cells we are to return to the client. If null,
+     *   client doesn't want results returned.
+     * @return Resulting Cells after <code>deltas</code> have been applied to current
+     *   values. Side effect is our filling out of the <code>results</code> List.
+     */
+    private List<Cell> reckonDeltasByStore(HStore store, Mutation mutation, long now,
+      List<Cell> deltas, List<Cell> results) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
+      List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
+
+      // Get previous values for all columns in this family.
+      TimeRange tr;
+      if (mutation instanceof Increment) {
+        tr = ((Increment) mutation).getTimeRange();
+      } else {
+        tr = ((Append) mutation).getTimeRange();
+      }
+      List<Cell> currentValues = get(mutation, store, deltas, tr);
+
+      // Iterate the input columns and update existing values if they were found, otherwise
+      // add new column initialized to the delta amount
+      int currentValuesIndex = 0;
+      for (int i = 0; i < deltas.size(); i++) {
+        Cell delta = deltas.get(i);
+        Cell currentValue = null;
+        if (currentValuesIndex < currentValues.size() &&
+          CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
+          currentValue = currentValues.get(currentValuesIndex);
+          if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
+            currentValuesIndex++;
+          }
+        }
+        // Switch on whether this an increment or an append building the new Cell to apply.
+        Cell newCell;
+        if (mutation instanceof Increment) {
+          long deltaAmount = getLongValue(delta);
+          final long newValue = currentValue == null ?
+            deltaAmount : getLongValue(currentValue) + deltaAmount;
+          newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
+            (oldCell) -> Bytes.toBytes(newValue));
+        } else {

Review comment:
       Better to check that it's an Append and throw an exception if it isn't?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> results)

Review comment:
       Javadoc here would also be great to supplement `reckonDeltasByStore`'s javadoc.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -742,36 +741,35 @@ private Result increment(final HRegion region, final OperationQuota quota,
     spaceQuota.getPolicyEnforcement(region).check(increment);
     quota.addMutation(increment);
     Result r = null;
-    if (region.getCoprocessorHost() != null) {
-      r = region.getCoprocessorHost().preIncrement(increment);
-    }
-    if (r == null) {
-      boolean canProceed = startNonceOperation(mutation, nonceGroup);
-      boolean success = false;
-      try {
-        long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
-        if (canProceed) {
-          r = region.increment(increment, nonceGroup, nonce);
-        } else {
+    boolean canProceed = startNonceOperation(mutation, nonceGroup);

Review comment:
       You've changed the semantics here. Before, we would call `preIncrement` and then create a new nonce'd operation. Now, we'll always make a new nonce operation, even if the CP is about to say "skip this increment"
   
   Is that intentional?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -2910,23 +2908,35 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
           }
 
           try {
-            CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
-              cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
-            regionActionResultBuilder.setProcessed(result.isSuccess());
             ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
               ClientProtos.ResultOrException.newBuilder();
-            for (int i = 0; i < regionAction.getActionCount(); i++) {
-              if (i == 0 && result.getResult() != null) {
-                resultOrExceptionOrBuilder.setIndex(i);
-                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
-                  .setResult(ProtobufUtil.toResult(result.getResult())).build());
-                continue;
+            if (regionAction.getActionCount() == 1) {
+              CheckAndMutateResult result = checkAndMutate(region, quota,
+                regionAction.getAction(0).getMutation(), cellScanner,
+                regionAction.getCondition(), spaceQuotaEnforcement);
+              regionActionResultBuilder.setProcessed(result.isSuccess());
+              resultOrExceptionOrBuilder.setIndex(0);
+              if (result.getResult() != null) {
+                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));

Review comment:
       Is it more optimal to check for the single action case? What does this get us over the previous method of using the "collection of Actions" method even if we only have a single action?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org