You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/16 00:53:26 UTC

[phoenix] branch 4.14-HBase-1.4 updated (22e0238 -> 5688c5f)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a change to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


    from 22e0238  PHOENIX-5564 Restructure read repair to improve readability and correctness
     new da9f905  PHOENIX-5556 Avoid repeatedly loading IndexMetaData For IndexRegionObserver
     new da68fc8  PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes
     new 5688c5f  PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../phoenix/hbase/index/IndexRegionObserver.java   | 229 ++++++++++-----------
 .../hbase/index/builder/IndexBuildManager.java     |  19 +-
 2 files changed, 121 insertions(+), 127 deletions(-)


[phoenix] 03/03: PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 5688c5fabde4c6622512d2439d8e075bb16aae25
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Nov 7 15:50:40 2019 -0800

    PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 155 +++++++++++----------
 .../hbase/index/builder/IndexBuildManager.java     |  15 +-
 2 files changed, 84 insertions(+), 86 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b058b33..340832f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +69,7 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
@@ -145,16 +148,16 @@ public class IndexRegionObserver extends BaseRegionObserver {
       private final int clientVersion;
       // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
       // the verified column) will have the value false ("unverified") on these mutations
-      private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
+      private ListMultimap<HTableInterfaceReference, Mutation> preIndexUpdates;
       // The collection of index mutations that will be applied after the data table mutations. The empty column (i.e.,
       // the verified column) will have the value true ("verified") on the put mutations
-      private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+      private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates;
       // The collection of candidate index mutations that will be applied after the data table mutations
-      private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+      private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates;
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
-      long dataWriteStartTime;
-
+      private long dataWriteStartTime;
+      private boolean rebuild;
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
@@ -506,6 +509,27 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
+  private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                       MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                       ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
+      byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+      HTableInterfaceReference hTableInterfaceReference =
+                          new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+      List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
+      if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+          return;
+      }
+      List<Mutation> localUpdates = new ArrayList<Mutation>();
+      Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
+      while (indexUpdatesItr.hasNext()) {
+          Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+          localUpdates.add(next.getFirst());
+      }
+      if (!localUpdates.isEmpty()) {
+          miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
+      }
+  }
+
   private void prepareIndexMutations(
           ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp,
@@ -513,79 +537,56 @@ public class IndexRegionObserver extends BaseRegionObserver {
           Collection<? extends Mutation> mutations,
           long now,
           PhoenixIndexMetaData indexMetaData) throws Throwable {
-
       List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-
       // get the current span, or just use a null-span to avoid a bunch of if statements
       try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
           Span current = scope.getSpan();
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
-
           // get the index updates for all elements in this batch
-          Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
-                  this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData);
-
+          context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+          this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData);
           current.addTimelineAnnotation("Built index updates, doing preStep");
-          TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-          byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
-          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
-          List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
-          context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
-          context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
-          while(indexUpdatesItr.hasNext()) {
-              Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
-              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
-                  localUpdates.add(next.getFirst().getFirst());
-                  indexUpdatesItr.remove();
-              }
-              else {
-                  // get index maintainer for this index table
-                  IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
-                  if (indexMaintainer == null) {
-                      throw new DoNotRetryIOException(
-                              "preBatchMutateWithExceptions: indexMaintainer is null " +
-                                      c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-                  }
-                  byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+          handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates);
+          context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+          int updateCount = 0;
+          for (IndexMaintainer indexMaintainer : maintainers) {
+              updateCount++;
+              byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+              byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+              HTableInterfaceReference hTableInterfaceReference =
+                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+              Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
+                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+              while (indexUpdatesItr.hasNext()) {
+                  Pair<Mutation, byte[]> next = indexUpdatesItr.next();
                   // add the VERIFIED cell, which is the empty cell
-                  Mutation m = next.getFirst().getFirst();
-                  boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
-                  if (rebuild) {
+                  Mutation m = next.getFirst();
+                  if (context.rebuild) {
+                      indexUpdatesItr.remove();
                       if (m instanceof Put) {
                           long ts = getMaxTimestamp(m);
                           // Remove the empty column prepared by Index codec as we need to change its value
                           removeEmptyColumn(m, emptyCF, emptyCQ);
-                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+                          ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
                       }
+                      context.preIndexUpdates.put(hTableInterfaceReference, m);
                   } else {
-                      indexUpdatesItr.remove();
                       // For this mutation whether it is put or delete, set the status of the index row "unverified"
                       // This will be done before the data table row is updated (i.e., in the first write phase)
                       Put unverifiedPut = new Put(m.getRow());
                       unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
-                      context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
+                      context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
                       if (m instanceof Put) {
                           // Remove the empty column prepared by Index codec as we need to change its value
                           removeEmptyColumn(m, emptyCF, emptyCQ);
                           ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
                       }
-                      context.intermediatePostIndexUpdates.add(next);
                   }
               }
           }
-          if (!localUpdates.isEmpty()) {
-              miniBatchOp.addOperationsFromCP(0,
-                      localUpdates.toArray(new Mutation[localUpdates.size()]));
-          }
-          if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
-              context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
-          }
-          for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
-              context.preIndexUpdates.add(update.getFirst());
-          }
+          TracingUtils.addAnnotation(current, "index update count", updateCount);
       }
   }
 
@@ -610,20 +611,23 @@ public class IndexRegionObserver extends BaseRegionObserver {
       setBatchMutateContext(c, context);
       Mutation firstMutation = miniBatchOp.getOperation(0);
       ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+      context.rebuild = replayWrite != null;
       /*
        * Exclusively lock all rows so we get a consistent read
        * while determining the index updates
        */
-      if (replayWrite == null) {
+      long now;
+      if (!context.rebuild) {
           populateRowsToLock(miniBatchOp, context);
           lockRows(context);
-      }
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
-      // concurrent updates
-      if (replayWrite == null) {
+          now = EnvironmentEdgeManager.currentTimeMillis();
+          // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+          // concurrent updates
           populatePendingRows(context);
       }
+      else {
+          now = EnvironmentEdgeManager.currentTimeMillis();
+      }
       // First group all the updates for a single row into a single update to be processed
       Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
       // early exit if it turns out we don't have any edits
@@ -646,9 +650,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
       for (RowLock rowLock : context.rowLocks) {
           rowLock.release();
       }
-      // Do the index updates
+      // Do the first phase index updates
       doPre(c, context, miniBatchOp);
-      if (replayWrite == null) {
+      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+      if (!context.rebuild) {
+          List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
           // Acquire the locks again before letting the region proceed with data table updates
           List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
           for (RowLock rowLock : context.rowLocks) {
@@ -658,29 +664,26 @@ public class IndexRegionObserver extends BaseRegionObserver {
           context.rowLocks.clear();
           context.rowLocks = rowLocks;
           // Check if we need to skip post index update for any of the row
-          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator();
-          while (iterator.hasNext()) {
-              // Check if this row is going through another mutation which has a newer timestamp. If so,
-              // ignore the pending updates for this row
-              Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
-              ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-              PendingRow pendingRow = pendingRows.get(rowKey);
-              // Are there concurrent updates on the data table row? if so, skip post index updates
-              // and let read repair resolve conflicts
-              if (pendingRow.isConcurrent()) {
-                  iterator.remove();
+          for (IndexMaintainer indexMaintainer : maintainers) {
+              HTableInterfaceReference hTableInterfaceReference =
+                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+              Iterator<Pair<Mutation, byte[]>> iterator =
+                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+              while (iterator.hasNext()) {
+                  // Are there concurrent updates on the data table row? if so, skip post index updates
+                  // and let read repair resolve conflicts
+                  Pair<Mutation, byte[]> update = iterator.next();
+                  ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+                  PendingRow pendingRow = pendingRows.get(rowKey);
+                  if (!pendingRow.isConcurrent()) {
+                      context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst());
+                  }
               }
           }
           // We are done with handling concurrent mutations. So we can remove the rows of this batch from
           // the collection of pending rows
           removePendingRows(context);
       }
-      if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) {
-          context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size());
-      }
-      for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) {
-          context.postIndexUpdates.add(update.getFirst());
-      }
       if (failDataTableUpdatesForTesting) {
           throw new DoNotRetryIOException("Simulating the data table write failure");
       }
@@ -758,7 +761,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
   private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post)
             throws IOException {
-      Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
+      ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
       //short circuit, if we don't need to do any work
 
       if (context == null || indexUpdates.isEmpty()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 7639a49..90d28b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.collect.ListMultimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
@@ -34,6 +35,8 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +82,7 @@ public class IndexBuildManager implements Stoppable {
       return this.delegate.getIndexMetaData(miniBatchOp);
   }
 
-  public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
+  public void getIndexUpdates(ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates,
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
       Collection<? extends Mutation> mutations,
       IndexMetaData indexMetaData) throws Throwable {
@@ -87,20 +90,12 @@ public class IndexBuildManager implements Stoppable {
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
-    ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
       Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
-      if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
-          for (Pair<Mutation, byte[]> update : updates) {
-            update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-          }
-      }
       for (Pair<Mutation, byte[]> update : updates) {
-        results.add(new Pair<>(update, m.getRow()));
+        indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
       }
     }
-    return results;
   }
 
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(


[phoenix] 02/03: PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit da68fc893d18872d16b4f30ac97dd6a642eac1fc
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Nov 6 22:04:20 2019 -0800

    PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 51 ++++++++--------------
 1 file changed, 17 insertions(+), 34 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e8d9a05..b058b33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -104,19 +104,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
    * Class to represent pending data table rows
    */
   private static class PendingRow {
-      private long latestTimestamp;
-      private long count;
+      private boolean concurrent = false;
+      private long count = 1;
 
-      PendingRow(long latestTimestamp) {
-          count = 1;
-          this.latestTimestamp = latestTimestamp;
-      }
-
-      public void add(long timestamp) {
+      public void add() {
           count++;
-          if (latestTimestamp < timestamp) {
-              latestTimestamp = timestamp;
-          }
+          concurrent = true;
       }
 
       public void remove() {
@@ -127,8 +120,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
           return count;
       }
 
-      public long getLatestTimestamp() {
-          return latestTimestamp;
+      public boolean isConcurrent() {
+          return concurrent;
       }
   }
 
@@ -159,10 +152,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
       // The collection of candidate index mutations that will be applied after the data table mutations
       private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-      // The set of row keys for the data table rows of this batch such that for each of these rows there exists another
-      // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
-      // row (i.e., concurrent updates).
-      private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
       long dataWriteStartTime;
 
@@ -401,16 +390,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-  private void populatePendingRows(BatchMutateContext context, long now) {
+  private void populatePendingRows(BatchMutateContext context) {
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
           PendingRow pendingRow = pendingRows.get(rowKey);
           if (pendingRow == null) {
-              pendingRows.put(rowKey, new PendingRow(now));
+              pendingRows.put(rowKey, new PendingRow());
           } else {
               // m is a mutation on a row that has already a pending mutation in progress from another batch
-              pendingRow.add(now);
-              context.pendingRows.add(rowKey);
+              pendingRow.add();
           }
       }
   }
@@ -579,17 +567,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
                       Put unverifiedPut = new Put(m.getRow());
                       unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
                       context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
-                      // Ignore post index updates (i.e., the third write phase updates) for this row if it is
-                      // going through concurrent updates
-                      ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
-                      if (!context.pendingRows.contains(rowKey)) {
-                          if (m instanceof Put) {
-                              // Remove the empty column prepared by Index codec as we need to change its value
-                              removeEmptyColumn(m, emptyCF, emptyCQ);
-                              ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
-                          }
-                          context.intermediatePostIndexUpdates.add(next);
+                      if (m instanceof Put) {
+                          // Remove the empty column prepared by Index codec as we need to change its value
+                          removeEmptyColumn(m, emptyCF, emptyCQ);
+                          ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
                       }
+                      context.intermediatePostIndexUpdates.add(next);
                   }
               }
           }
@@ -639,7 +622,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
       // concurrent updates
       if (replayWrite == null) {
-          populatePendingRows(context, now);
+          populatePendingRows(context);
       }
       // First group all the updates for a single row into a single update to be processed
       Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
@@ -682,9 +665,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
               Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
               ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
               PendingRow pendingRow = pendingRows.get(rowKey);
-              // Has any concurrent mutation arrived for the same row? if so, skip post index updates
+              // Are there concurrent updates on the data table row? if so, skip post index updates
               // and let read repair resolve conflicts
-              if (pendingRow.getLatestTimestamp() > now) {
+              if (pendingRow.isConcurrent()) {
                   iterator.remove();
               }
           }


[phoenix] 01/03: PHOENIX-5556 Avoid repeatedly loading IndexMetaData For IndexRegionObserver

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit da9f90559da5b3a980797aa90856b3f57ccb080e
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Nov 7 10:29:05 2019 +0800

    PHOENIX-5556 Avoid repeatedly loading IndexMetaData For IndexRegionObserver
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 39 ++++++++++++++--------
 .../hbase/index/builder/IndexBuildManager.java     |  4 +--
 2 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 83a54f6..e8d9a05 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -518,16 +518,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-  private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
-                                     MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context,
-                                     Collection<? extends Mutation> mutations, long now) throws Throwable {
-      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
-      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
-          throw new DoNotRetryIOException(
-                  "preBatchMutateWithExceptions: indexMetaData is not an instance of PhoenixIndexMetaData " +
-                          c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
+  private void prepareIndexMutations(
+          ObserverContext<RegionCoprocessorEnvironment> c,
+          MiniBatchOperationInProgress<Mutation> miniBatchOp,
+          BatchMutateContext context,
+          Collection<? extends Mutation> mutations,
+          long now,
+          PhoenixIndexMetaData indexMetaData) throws Throwable {
+
+      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
 
       // get the current span, or just use a null-span to avoid a bunch of if statements
       try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -538,7 +537,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
           // get the index updates for all elements in this batch
           Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
-                  this.builder.getIndexUpdates(miniBatchOp, mutations);
+                  this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData);
 
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -607,10 +606,24 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
+  protected PhoenixIndexMetaData getPhoenixIndexMetaData(
+          ObserverContext<RegionCoprocessorEnvironment> observerContext,
+          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+          throw new DoNotRetryIOException(
+                  "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
+                          ", current table is:" +
+                          observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+      }
+      return (PhoenixIndexMetaData)indexMetaData;
+  }
+
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
       ignoreAtomicOperations(miniBatchOp);
-      BatchMutateContext context = new BatchMutateContext(this.builder.getIndexMetaData(miniBatchOp).getClientVersion());
+      PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
+      BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
       setBatchMutateContext(c, context);
       Mutation firstMutation = miniBatchOp.getOperation(0);
       ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
@@ -636,7 +649,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
 
       long start = EnvironmentEdgeManager.currentTimeMillis();
-      prepareIndexMutations(c, miniBatchOp, context, mutations, now);
+      prepareIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
       metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
 
       // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 6b7e416..7639a49 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -81,9 +81,9 @@ public class IndexBuildManager implements Stoppable {
 
   public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
-      Collection<? extends Mutation> mutations) throws Throwable {
+      Collection<? extends Mutation> mutations,
+      IndexMetaData indexMetaData) throws Throwable {
     // notify the delegate that we have started processing a batch
-    final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.