You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/07 19:52:04 UTC

[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new 69448a8  PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes
69448a8 is described below

commit 69448a845e843628558e17032287fe9e2bb6c5cf
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Nov 6 22:04:20 2019 -0800

    PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 51 ++++++++--------------
 1 file changed, 17 insertions(+), 34 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e8d9a05..b058b33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -104,19 +104,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
    * Class to represent pending data table rows
    */
   private static class PendingRow {
-      private long latestTimestamp;
-      private long count;
+      private boolean concurrent = false;
+      private long count = 1;
 
-      PendingRow(long latestTimestamp) {
-          count = 1;
-          this.latestTimestamp = latestTimestamp;
-      }
-
-      public void add(long timestamp) {
+      public void add() {
           count++;
-          if (latestTimestamp < timestamp) {
-              latestTimestamp = timestamp;
-          }
+          concurrent = true;
       }
 
       public void remove() {
@@ -127,8 +120,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
           return count;
       }
 
-      public long getLatestTimestamp() {
-          return latestTimestamp;
+      public boolean isConcurrent() {
+          return concurrent;
       }
   }
 
@@ -159,10 +152,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
       // The collection of candidate index mutations that will be applied after the data table mutations
       private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-      // The set of row keys for the data table rows of this batch such that for each of these rows there exists another
-      // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
-      // row (i.e., concurrent updates).
-      private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
       long dataWriteStartTime;
 
@@ -401,16 +390,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-  private void populatePendingRows(BatchMutateContext context, long now) {
+  private void populatePendingRows(BatchMutateContext context) {
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
           PendingRow pendingRow = pendingRows.get(rowKey);
           if (pendingRow == null) {
-              pendingRows.put(rowKey, new PendingRow(now));
+              pendingRows.put(rowKey, new PendingRow());
           } else {
               // m is a mutation on a row that has already a pending mutation in progress from another batch
-              pendingRow.add(now);
-              context.pendingRows.add(rowKey);
+              pendingRow.add();
           }
       }
   }
@@ -579,17 +567,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
                       Put unverifiedPut = new Put(m.getRow());
                       unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
                       context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
-                      // Ignore post index updates (i.e., the third write phase updates) for this row if it is
-                      // going through concurrent updates
-                      ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
-                      if (!context.pendingRows.contains(rowKey)) {
-                          if (m instanceof Put) {
-                              // Remove the empty column prepared by Index codec as we need to change its value
-                              removeEmptyColumn(m, emptyCF, emptyCQ);
-                              ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
-                          }
-                          context.intermediatePostIndexUpdates.add(next);
+                      if (m instanceof Put) {
+                          // Remove the empty column prepared by Index codec as we need to change its value
+                          removeEmptyColumn(m, emptyCF, emptyCQ);
+                          ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
                       }
+                      context.intermediatePostIndexUpdates.add(next);
                   }
               }
           }
@@ -639,7 +622,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
       // concurrent updates
       if (replayWrite == null) {
-          populatePendingRows(context, now);
+          populatePendingRows(context);
       }
       // First group all the updates for a single row into a single update to be processed
       Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
@@ -682,9 +665,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
               Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
               ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
               PendingRow pendingRow = pendingRows.get(rowKey);
-              // Has any concurrent mutation arrived for the same row? if so, skip post index updates
+              // Are there concurrent updates on the data table row? if so, skip post index updates
               // and let read repair resolve conflicts
-              if (pendingRow.getLatestTimestamp() > now) {
+              if (pendingRow.isConcurrent()) {
                   iterator.remove();
               }
           }