You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/05/03 21:55:50 UTC

[GitHub] [phoenix] gjacoby126 commented on a change in pull request #1215: PHOENIX-6387 Conditional updates on tables with indexes

gjacoby126 commented on a change in pull request #1215:
URL: https://github.com/apache/phoenix/pull/1215#discussion_r624274587



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
##########
@@ -456,6 +456,42 @@ public void testTenantViewUpsertWithIndex() throws Exception {
         tenantViewHelper(true);
     }
 
+    @Test
+    public void testOnDuplicateUpsertWithIndex() throws Exception {
+        if (this.isImmutable) {

Review comment:
       nit: can be Assume.assumeFalse(this.isImmutable)

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1014,17 +1166,47 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
         }
     }
 
+    /**
+     * In case of ON DUPLICATE KEY IGNORE, if the row already exists no mutations will be
+     * generated so release the row lock.

Review comment:
       nice optimization

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -926,57 +1051,84 @@ private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext
                 // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext
                 if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration,
                         TimeUnit.MILLISECONDS)) {
+                    LOG.debug(String.format("latch timeout context %s last %s", context, lastContext));
                     done = false;
-                    break;
                 }
                 // Acquire the locks again before letting the region proceed with data table updates
                 lockRows(context);
+                if (!done) {
+                    // previous concurrent batch did not complete so we have to retry this batch
+                    break;
+                } else {
+                    // read the phase again to determine the status of previous batch
+                    phase = lastContext.getCurrentPhase();
+                    LOG.debug(String.format("context %s last %s exit phase %s", context, lastContext, phase));

Review comment:
       what does to the toString() of a batch mutate context produce? If it outputs the actual mutations, this can be a problem for environments where logs aren't allowed to contain certain kinds of data. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -476,6 +500,53 @@ private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatch
       }
   }
 
+    /**
+     * Add the mutations generated by the ON DUPLICATE KEY UPDATE to the current batch.
+     * MiniBatchOperationInProgress#addOperationsFromCP() allows coprocessors to attach additional mutations

Review comment:
       Thanks for the good explanation. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase but in order
+     * to correctly support concurrent index mutations we need to always read the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally

Review comment:
       What if ON DUPLICATE KEY UPDATE does something that would generate a Delete mutation (like setting something to NULL)?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -885,8 +1001,7 @@ private void preparePostIndexMutations(BatchMutateContext context,
                 }
             }
         }
-        removePendingRows(context);
-        context.indexUpdates.clear();
+        // all cleanup will be done in postBatchMutateIndispensably()

Review comment:
       thanks for the helpful comment

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1138,10 +1329,10 @@ private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateC
           metricSource.updatePreIndexUpdateFailureTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
           metricSource.incrementPreIndexUpdateFailures(dataTableName);
-          // Remove all locks as they are already unlocked. There is no need to unlock them again later when
-          // postBatchMutateIndispensably() is called
-          removePendingRows(context);
-          context.rowLocks.clear();
+          // Re-acquire all locks since we released them before making index updates
+          // Removal of reference counts and locks for the rows of this batch will be

Review comment:
       I don't follow why we want to relock rows here rather than later when we retry (if we retry)? If this is the last retry and we fail, then when do the rows get unlocked?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -520,29 +591,48 @@ private void populatePendingRows(BatchMutateContext context) {
                     context.multiMutationMap.put(row, stored);
                 }
                 stored.addAll(m);
+                Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);

Review comment:
       do we need to check || isAtomic(m) on line 585, or do we only group if we actually have an index? 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase but in order
+     * to correctly support concurrent index mutations we need to always read the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally
+     * one Delete mutation (with DeleteColumn type cells for all columns set to null).
+     */
+  private List<Mutation> generateOnDupMutations(BatchMutateContext context, Put atomicPut) throws IOException {
+      List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+      byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+      if (opBytes == null) { // Unexpected
+          return null;
+      }
+      Put put = null;
+      Delete delete = null;
+      // We cannot neither use the time stamp in the Increment to set the Get time range
+      // nor set the Put/Delete time stamp and have this be atomic as HBase does not
+      // handle that. Though we disallow using ON DUPLICATE KEY clause when the
+      // CURRENT_SCN is set, we still may have a time stamp set as of when the table
+      // was resolved on the client side. We need to ignore this as well due to limitations
+      // in HBase, but this isn't too bad as the time will be very close the the current
+      // time anyway.
+      long ts = HConstants.LATEST_TIMESTAMP;
+
+      byte[] rowKey = atomicPut.getRow();
+      ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(rowKey);
+      // Get the latest data row state
+      Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+      Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() : null;
+
+      if (PhoenixIndexBuilder.isDupKeyIgnore(opBytes)) {
+          if (currentDataRowState == null) {
+              // new row
+              mutations.add(atomicPut);
+          }
+          return mutations;
+      }
+
+      ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
+      DataInputStream input = new DataInputStream(stream);
+      boolean skipFirstOp = input.readBoolean();
+      short repeat = input.readShort();
+      final int[] estimatedSizeHolder = {0};
+      List<Pair<PTable, List<Expression>>> operations = Lists.newArrayListWithExpectedSize(3);
+
+      // store the columns that need to be read in the conditional expressions
+      final Set<ColumnReference> colsReadInExpr = new HashSet<>();
+      while (true) {
+          ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
+              @Override

Review comment:
       nit: good place for an Extract Method, since the logic's complex and not needed to follow the logic of the larger method. Also good to stick an extra comment explaining the column parsing. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase but in order
+     * to correctly support concurrent index mutations we need to always read the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally
+     * one Delete mutation (with DeleteColumn type cells for all columns set to null).
+     */
+  private List<Mutation> generateOnDupMutations(BatchMutateContext context, Put atomicPut) throws IOException {
+      List<Mutation> mutations = Lists.newArrayListWithExpectedSize(2);
+      byte[] opBytes = atomicPut.getAttribute(ATOMIC_OP_ATTRIB);
+      if (opBytes == null) { // Unexpected
+          return null;
+      }
+      Put put = null;
+      Delete delete = null;
+      // We cannot neither use the time stamp in the Increment to set the Get time range

Review comment:
       There's no longer an Increment right?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
##########
@@ -1165,4 +1356,255 @@ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBu
       properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
       desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
   }
-}
\ No newline at end of file
+
+    /**
+     * This function has been adapted from PhoenixIndexBuilder#executeAtomicOp().
+     * The critical difference being that the code in PhoenixIndexBuilder#executeAtomicOp()
+     * generates the mutations by reading the latest data table row from HBase but in order
+     * to correctly support concurrent index mutations we need to always read the latest
+     * data table row from memory.
+     * It takes in an atomic Put mutation and generates a list of Put and Delete mutations.
+     * The list will be empty in the case of ON DUPLICATE KEY IGNORE and the row already exists.
+     * In the case of ON DUPLICATE KEY UPDATE, we will generate one Put mutation and optionally

Review comment:
       Looks like testDeleteOnSingleLowerCaseVarcharColumn already tests for this, so we're ok?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org