You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/10/13 03:40:55 UTC

[phoenix] branch 4.x updated: PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new f6a5b93  PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes
f6a5b93 is described below

commit f6a5b939b08cdceb995f9fa166e25741c762bad8
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Sun Sep 27 15:59:20 2020 -0700

    PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 267 +++++++++++++--------
 1 file changed, 167 insertions(+), 100 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index edc281e..74b8088 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -71,7 +71,6 @@ import org.apache.phoenix.hbase.index.LockManager.RowLock;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
@@ -91,6 +90,8 @@ import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
 import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
@@ -117,24 +118,29 @@ public class IndexRegionObserver extends BaseRegionObserver {
    * Class to represent pending data table rows
    */
   private static class PendingRow {
-      private boolean concurrent = false;
-      private long count = 1;
+      private int count;
+      private BatchMutateContext lastContext;
 
-      public void add() {
+      PendingRow(BatchMutateContext context) {
+          count = 1;
+          lastContext = context;
+      }
+
+      public void add(BatchMutateContext context) {
           count++;
-          concurrent = true;
+          lastContext = context;
       }
 
       public void remove() {
           count--;
       }
 
-      public long getCount() {
+      public int getCount() {
           return count;
       }
 
-      public boolean isConcurrent() {
-          return concurrent;
+      public BatchMutateContext getLastContext() {
+          return lastContext;
       }
   }
 
@@ -150,9 +156,25 @@ public class IndexRegionObserver extends BaseRegionObserver {
       failDataTableUpdatesForTesting = fail;
   }
 
+  public enum BatchMutatePhase {
+      PRE, POST, FAILED
+  }
+
   // Hack to get around not being able to save any state between
   // coprocessor calls. TODO: remove after HBASE-18127 when available
+
+  /**
+   * The concurrent batch of mutations is a set such that every pair of batches in this set has at least one common row.
+   * Since a BatchMutateContext object of a batch is modified only after the row locks for all the rows that are mutated
+   * by this batch are acquired, there can be only one thread can acquire the locks for its batch and safely access
+   * all the batch contexts in the set of concurrent batches. Because of this, we do not read atomic variables or
+   * additional locks to serialize the access to the BatchMutateContext objects.
+   */
+
   private static class BatchMutateContext {
+      private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+      // The max of reference counts on the pending rows of this batch at the time this batch arrives
+      private int maxPendingRowCount = 0;
       private final int clientVersion;
       // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
       // the verified column) will have the value false ("unverified") on these mutations
@@ -166,12 +188,40 @@ public class IndexRegionObserver extends BaseRegionObserver {
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
       // The current and next states of the data rows corresponding to the pending mutations
       private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
-      // Data table pending mutations
+      // The previous concurrent batch contexts
+      private HashMap<ImmutableBytesPtr, BatchMutateContext> lastConcurrentBatchContext = null;
+      // The latches of the threads waiting for this batch to complete
+      private List<CountDownLatch> waitList = null;
       private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
 
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
+
+      public BatchMutatePhase getCurrentPhase() {
+          return currentPhase;
+      }
+
+      public Put getNextDataRowState(ImmutableBytesPtr rowKeyPtr) {
+          Pair<Put, Put> rowState = dataRowStates.get(rowKeyPtr);
+          if (rowState != null) {
+              return rowState.getSecond();
+          }
+          return null;
+      }
+
+      public CountDownLatch getCountDownLatch() {
+          if (waitList == null) {
+              waitList = new ArrayList<>();
+          }
+          CountDownLatch countDownLatch = new CountDownLatch(1);
+          waitList.add(countDownLatch);
+          return countDownLatch;
+      }
+
+      public int getMaxPendingRowCount() {
+          return maxPendingRowCount;
+      }
   }
 
   private ThreadLocal<BatchMutateContext> batchMutateContext =
@@ -211,9 +261,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
   private long slowIndexPrepareThreshold;
   private long slowPreIncrementThreshold;
   private int rowLockWaitDuration;
+  private int concurrentMutationWaitDuration;
   private String dataTableName;
 
   private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+  private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100;
 
   @Override
   public void start(CoprocessorEnvironment e) throws IOException {
@@ -245,6 +297,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
           this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
                 DEFAULT_ROWLOCK_WAIT_DURATION);
           this.lockManager = new LockManager();
+          this.concurrentMutationWaitDuration = env.getConfiguration().getInt("phoenix.index.concurrent.wait.duration.ms",
+                  DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS);
 
           // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
           this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
@@ -396,15 +450,22 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
+    private void unlockRows(BatchMutateContext context) throws IOException {
+        for (RowLock rowLock : context.rowLocks) {
+            rowLock.release();
+        }
+        context.rowLocks.clear();
+    }
+
   private void populatePendingRows(BatchMutateContext context) {
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
           PendingRow pendingRow = pendingRows.get(rowKey);
           if (pendingRow == null) {
-              pendingRows.put(rowKey, new PendingRow());
+              pendingRows.put(rowKey, new PendingRow(context));
           } else {
               // m is a mutation on a row that has already a pending mutation in progress from another batch
-              pendingRow.add();
+              pendingRow.add(context);
           }
       }
   }
@@ -593,15 +654,34 @@ public class IndexRegionObserver extends BaseRegionObserver {
     private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
                                      BatchMutateContext context) throws IOException {
         Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+        context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
         for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+            PendingRow pendingRow = pendingRows.get(rowKeyPtr);
+            if (pendingRow != null && pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
+                if (context.lastConcurrentBatchContext == null) {
+                    context.lastConcurrentBatchContext = new HashMap<>();
+                }
+                context.lastConcurrentBatchContext.put(rowKeyPtr, pendingRow.getLastContext());
+                if (context.maxPendingRowCount < pendingRow.getCount()) {
+                    context.maxPendingRowCount = pendingRow.getCount();
+                }
+                Put put = pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
+                if (put != null) {
+                    context.dataRowStates.put(rowKeyPtr, new Pair<Put, Put>(put, new Put(put)));
+                }
+            }
+            else {
+                keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+            }
+        }
+        if (keys.isEmpty()) {
+            return;
         }
         Scan scan = new Scan();
         ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
         scanRanges.initializeScan(scan);
         SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
         scan.setFilter(skipScanFilter);
-        context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
         try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
             boolean more = true;
             while(more) {
@@ -750,43 +830,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
         return (PhoenixIndexMetaData)indexMetaData;
     }
 
-    /**
-     * IndexMaintainer.getIndexedColumns() returns the data column references for indexed columns. The data columns are
-     * grouped into three classes, pk columns (data table pk columns), the indexed columns (the columns for which
-     * we want to have indexing; they form the prefix for the primary key for the index table (after salt and tenant id))
-     * and covered columns. The purpose of this method is to find out if all the indexed columns are included in the
-     * pending data table mutation pointed by multiMutation.
-     */
-    private boolean hasAllIndexedColumns(IndexMaintainer indexMaintainer, MultiMutation multiMutation) {
-        Map<byte[], List<Cell>> familyMap = multiMutation.getFamilyCellMap();
-        for (ColumnReference columnReference : indexMaintainer.getIndexedColumns()) {
-            byte[] family = columnReference.getFamily();
-            List<Cell> cellList = familyMap.get(family);
-            if (cellList == null) {
-                return false;
-            }
-            boolean has = false;
-            for (Cell cell : cellList) {
-                if (CellUtil.matchingColumn(cell, family, columnReference.getQualifier())) {
-                    has = true;
-                    break;
-                }
-            }
-            if (!has) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private void preparePostIndexMutations(TableName table,
-                                           BatchMutateContext context,
+    private void preparePostIndexMutations(BatchMutateContext context,
                                            long now,
-                                           PhoenixIndexMetaData indexMetaData)
-            throws Throwable {
+                                           PhoenixIndexMetaData indexMetaData) {
         context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
         List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-        // Check if we need to skip post index update for any of the rows
         for (IndexMaintainer indexMaintainer : maintainers) {
             byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
             byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
@@ -794,43 +842,17 @@ public class IndexRegionObserver extends BaseRegionObserver {
                     new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
             List<Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
             for (Pair<Mutation, byte[]> update : updates) {
-                // Are there concurrent updates on the data table row? if so, skip post index updates
-                // and let read repair resolve conflicts
-                ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-                PendingRow pendingRow = pendingRows.get(rowKey);
-                if (!pendingRow.isConcurrent()) {
-                    Mutation m = update.getFirst();
-                    if (m instanceof Put) {
-                        Put verifiedPut = new Put(m.getRow());
-                        // Set the status of the index row to "verified"
-                        verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
-                        context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
-                    } else {
-                        context.postIndexUpdates.put(hTableInterfaceReference, m);
-                    }
+                Mutation m = update.getFirst();
+                if (m instanceof Put) {
+                    Put verifiedPut = new Put(m.getRow());
+                    // Set the status of the index row to "verified"
+                    verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+                    context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
                 } else {
-                    if (!hasAllIndexedColumns(indexMaintainer, context.multiMutationMap.get(rowKey))) {
-                        // This batch needs to be retried since one of the concurrent mutations does not have the value
-                        // for an indexed column. Not including an index column may lead to incorrect index row key
-                        // generation for concurrent mutations since concurrent mutations are not serialized entirely
-                        // and do not see each other's effect on data table. Throwing an IOException will result in
-                        // retries of this batch. Before throwing exception, we need to remove reference counts and
-                        // locks for the rows of this batch
-                        removePendingRows(context);
-                        context.indexUpdates.clear();
-                        for (RowLock rowLock : context.rowLocks) {
-                            rowLock.release();
-                        }
-                        context.rowLocks.clear();
-                        throw new IOException("One of the concurrent mutations does not have all indexed columns. " +
-                                "The batch needs to be retried " + table.getNameAsString());
-                    }
+                    context.postIndexUpdates.put(hTableInterfaceReference, m);
                 }
             }
         }
-
-        // We are done with handling concurrent mutations. So we can remove the rows of this batch from
-        // the collection of pending rows
         removePendingRows(context);
         context.indexUpdates.clear();
     }
@@ -853,41 +875,76 @@ public class IndexRegionObserver extends BaseRegionObserver {
         return false;
     }
 
+    private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context)
+            throws Throwable {
+        boolean done;
+        BatchMutatePhase phase;
+        done = true;
+        for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) {
+            phase = lastContext.getCurrentPhase();
+            if (phase == BatchMutatePhase.FAILED) {
+                done = false;
+                break;
+            }
+            if (phase == BatchMutatePhase.PRE) {
+                CountDownLatch countDownLatch = lastContext.getCountDownLatch();
+                // Release the locks so that the previous concurrent mutation can go into the post phase
+                unlockRows(context);
+                // Wait for at most one concurrentMutationWaitDuration for each level in the dependency tree of batches.
+                // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext
+                if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration,
+                        TimeUnit.MILLISECONDS)) {
+                    done = false;
+                    break;
+                }
+                // Acquire the locks again before letting the region proceed with data table updates
+                lockRows(context);
+            }
+        }
+        if (!done) {
+            // This batch needs to be retried since one of the previous concurrent batches has not completed yet.
+            // Throwing an IOException will result in retries of this batch. Before throwing exception,
+            // we need to remove reference counts and locks for the rows of this batch
+            removePendingRows(context);
+            context.indexUpdates.clear();
+            for (RowLock rowLock : context.rowLocks) {
+                rowLock.release();
+            }
+            context.rowLocks.clear();
+            throw new IOException("One of the previous concurrent mutations has not completed. " +
+                    "The batch needs to be retried " + table.getNameAsString());
+        }
+    }
+
     public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
                                              MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
         ignoreAtomicOperations(miniBatchOp);
         PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
         BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
         setBatchMutateContext(c, context);
-        Mutation firstMutation = miniBatchOp.getOperation(0);
 
         /*
          * Exclusively lock all rows so we get a consistent read
          * while determining the index updates
          */
         populateRowsToLock(miniBatchOp, context);
+        // early exit if it turns out we don't have any update for indexes
+        if (context.rowsToLock.isEmpty()) {
+            return;
+        }
         lockRows(context);
-
         long now = EnvironmentEdgeManager.currentTimeMillis();
-        // Unless we're replaying edits to rebuild the index, we update the time stamp
-        // of the data table to prevent overlapping time stamps (which prevents index
+        // Update the timestamps of the data table mutations to prevent overlapping timestamps (which prevents index
         // inconsistencies as this case isn't handled correctly currently).
         setTimestamps(miniBatchOp, builder, now);
 
-        // Group all the updates for a single row into a single update to be processed (for local indexes, and global index retries)
-        Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
-        // early exit if it turns out we don't have any edits
-        if (mutations == null || mutations.isEmpty()) {
-            return;
-        }
-
         TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
         if (hasGlobalIndex(indexMetaData)) {
+            // Prepare current and next data rows states for pending mutations (for global indexes)
+            prepareDataRowStates(c, miniBatchOp, context, now);
             // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
             // concurrent updates
             populatePendingRows(context);
-            // Prepare current and next data rows states for pending mutations (for global indexes)
-            prepareDataRowStates(c, miniBatchOp, context, now);
             // early exit if it turns out we don't have any edits
             long start = EnvironmentEdgeManager.currentTimeMillis();
             preparePreIndexMutations(context, now, indexMetaData);
@@ -901,17 +958,19 @@ public class IndexRegionObserver extends BaseRegionObserver {
                 LOG.debug("slept 1ms for " + table.getNameAsString());
             }
             // Release the locks before making RPC calls for index updates
-            for (RowLock rowLock : context.rowLocks) {
-                rowLock.release();
-            }
+            unlockRows(context);
             // Do the first phase index updates
             doPre(c, context, miniBatchOp);
             // Acquire the locks again before letting the region proceed with data table updates
-            context.rowLocks.clear();
             lockRows(context);
-            preparePostIndexMutations(table, context, now, indexMetaData);
+            if (context.lastConcurrentBatchContext != null) {
+                waitForPreviousConcurrentBatch(table, context);
+            }
+            preparePostIndexMutations(context, now, indexMetaData);
         }
         if (hasLocalIndex(indexMetaData)) {
+            // Group all the updates for a single row into a single update to be processed (for local indexes)
+            Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
             handleLocalIndexUpdates(table, miniBatchOp, mutations, indexMetaData);
         }
         if (failDataTableUpdatesForTesting) {
@@ -942,9 +1001,17 @@ public class IndexRegionObserver extends BaseRegionObserver {
           return;
       }
       try {
-          for (RowLock rowLock : context.rowLocks) {
-              rowLock.release();
+          if (success) {
+              context.currentPhase = BatchMutatePhase.POST;
+          } else {
+              context.currentPhase = BatchMutatePhase.FAILED;
+          }
+          if (context.waitList != null) {
+              for (CountDownLatch countDownLatch : context.waitList) {
+                  countDownLatch.countDown();
+              }
           }
+          unlockRows(context);
           this.builder.batchCompleted(miniBatchOp);
 
           if (success) { // The pre-index and data table updates are successful, and now, do post index updates