You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/10/13 03:30:50 UTC
[phoenix] branch master updated: PHOENIX-6160 Simplifying
concurrent mutation handling for global Indexes
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 4b2ff49 PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes
4b2ff49 is described below
commit 4b2ff49fde9b52741e6ec4b7cb1e90724f4ed63a
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Sun Sep 27 15:59:20 2020 -0700
PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes
---
.../phoenix/hbase/index/IndexRegionObserver.java | 329 +++++++++++----------
1 file changed, 169 insertions(+), 160 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 0457481..4d804e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -76,7 +76,6 @@ import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException
import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
@@ -98,6 +97,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild;
@@ -125,24 +126,29 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
* Class to represent pending data table rows
*/
private static class PendingRow {
- private boolean concurrent = false;
- private long count = 1;
+ private int count;
+ private BatchMutateContext lastContext;
- public void add() {
+ PendingRow(BatchMutateContext context) {
+ count = 1;
+ lastContext = context;
+ }
+
+ public void add(BatchMutateContext context) {
count++;
- concurrent = true;
+ lastContext = context;
}
public void remove() {
count--;
}
- public long getCount() {
+ public int getCount() {
return count;
}
- public boolean isConcurrent() {
- return concurrent;
+ public BatchMutateContext getLastContext() {
+ return lastContext;
}
}
@@ -161,9 +167,25 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
failDataTableUpdatesForTesting = fail;
}
+ public enum BatchMutatePhase {
+ PRE, POST, FAILED
+ }
+
// Hack to get around not being able to save any state between
// coprocessor calls. TODO: remove after HBASE-18127 when available
+
+ /**
+ * The concurrent batch of mutations is a set such that every pair of batches in this set has at least one common row.
+ * Since a BatchMutateContext object of a batch is modified only after the row locks for all the rows that are mutated
+ * by this batch are acquired, there can be only one thread can acquire the locks for its batch and safely access
+ * all the batch contexts in the set of concurrent batches. Because of this, we do not read atomic variables or
+ * additional locks to serialize the access to the BatchMutateContext objects.
+ */
+
private static class BatchMutateContext {
+ private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+ // The max of reference counts on the pending rows of this batch at the time this batch arrives
+ private int maxPendingRowCount = 0;
private final int clientVersion;
// The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
// the verified column) will have the value false ("unverified") on these mutations
@@ -175,15 +197,42 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates;
private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
- private boolean rebuild;
// The current and next states of the data rows corresponding to the pending mutations
private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
- // Data table pending mutations
+ // The previous concurrent batch contexts
+ private HashMap<ImmutableBytesPtr, BatchMutateContext> lastConcurrentBatchContext = null;
+ // The latches of the threads waiting for this batch to complete
+ private List<CountDownLatch> waitList = null;
private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
private BatchMutateContext(int clientVersion) {
this.clientVersion = clientVersion;
}
+
+ public BatchMutatePhase getCurrentPhase() {
+ return currentPhase;
+ }
+
+ public Put getNextDataRowState(ImmutableBytesPtr rowKeyPtr) {
+ Pair<Put, Put> rowState = dataRowStates.get(rowKeyPtr);
+ if (rowState != null) {
+ return rowState.getSecond();
+ }
+ return null;
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ if (waitList == null) {
+ waitList = new ArrayList<>();
+ }
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ waitList.add(countDownLatch);
+ return countDownLatch;
+ }
+
+ public int getMaxPendingRowCount() {
+ return maxPendingRowCount;
+ }
}
private ThreadLocal<BatchMutateContext> batchMutateContext =
@@ -223,9 +272,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
private long slowIndexPrepareThreshold;
private long slowPreIncrementThreshold;
private int rowLockWaitDuration;
+ private int concurrentMutationWaitDuration;
private String dataTableName;
private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+ private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100;
@Override
public Optional<RegionObserver> getRegionObserver() {
@@ -259,8 +310,9 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
- this.lockManager = new LockManager();
-
+ this.lockManager = new LockManager();
+ this.concurrentMutationWaitDuration = env.getConfiguration().getInt("phoenix.index.concurrent.wait.duration.ms",
+ DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS);
// Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
setSlowThresholds(e.getConfiguration());
@@ -409,15 +461,22 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
}
}
+ private void unlockRows(BatchMutateContext context) throws IOException {
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ context.rowLocks.clear();
+ }
+
private void populatePendingRows(BatchMutateContext context) {
for (RowLock rowLock : context.rowLocks) {
ImmutableBytesPtr rowKey = rowLock.getRowKey();
PendingRow pendingRow = pendingRows.get(rowKey);
if (pendingRow == null) {
- pendingRows.put(rowKey, new PendingRow());
+ pendingRows.put(rowKey, new PendingRow(context));
} else {
// m is a mutation on a row that has already a pending mutation in progress from another batch
- pendingRow.add();
+ pendingRow.add(context);
}
}
}
@@ -608,15 +667,34 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context) throws IOException {
Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+ context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
- keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+ PendingRow pendingRow = pendingRows.get(rowKeyPtr);
+ if (pendingRow != null && pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
+ if (context.lastConcurrentBatchContext == null) {
+ context.lastConcurrentBatchContext = new HashMap<>();
+ }
+ context.lastConcurrentBatchContext.put(rowKeyPtr, pendingRow.getLastContext());
+ if (context.maxPendingRowCount < pendingRow.getCount()) {
+ context.maxPendingRowCount = pendingRow.getCount();
+ }
+ Put put = pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
+ if (put != null) {
+ context.dataRowStates.put(rowKeyPtr, new Pair<Put, Put>(put, new Put(put)));
+ }
+ }
+ else {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+ }
+ }
+ if (keys.isEmpty()) {
+ return;
}
Scan scan = new Scan();
ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
scanRanges.initializeScan(scan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
scan.setFilter(skipScanFilter);
- context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
boolean more = true;
while(more) {
@@ -765,43 +843,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
return (PhoenixIndexMetaData)indexMetaData;
}
- /**
- * IndexMaintainer.getIndexedColumns() returns the data column references for indexed columns. The data columns are
- * grouped into three classes, pk columns (data table pk columns), the indexed columns (the columns for which
- * we want to have indexing; they form the prefix for the primary key for the index table (after salt and tenant id))
- * and covered columns. The purpose of this method is to find out if all the indexed columns are included in the
- * pending data table mutation pointed by multiMutation.
- */
- private boolean hasAllIndexedColumns(IndexMaintainer indexMaintainer, MultiMutation multiMutation) {
- Map<byte[], List<Cell>> familyMap = multiMutation.getFamilyCellMap();
- for (ColumnReference columnReference : indexMaintainer.getIndexedColumns()) {
- byte[] family = columnReference.getFamily();
- List<Cell> cellList = familyMap.get(family);
- if (cellList == null) {
- return false;
- }
- boolean has = false;
- for (Cell cell : cellList) {
- if (CellUtil.matchingColumn(cell, family, columnReference.getQualifier())) {
- has = true;
- break;
- }
- }
- if (!has) {
- return false;
- }
- }
- return true;
- }
-
- private void preparePostIndexMutations(TableName table,
- BatchMutateContext context,
+ private void preparePostIndexMutations(BatchMutateContext context,
long now,
- PhoenixIndexMetaData indexMetaData)
- throws Throwable {
+ PhoenixIndexMetaData indexMetaData) {
context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
- // Check if we need to skip post index update for any of the rows
for (IndexMaintainer indexMaintainer : maintainers) {
byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
@@ -809,90 +855,21 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
List<Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
for (Pair<Mutation, byte[]> update : updates) {
- // Are there concurrent updates on the data table row? if so, skip post index updates
- // and let read repair resolve conflicts
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
- PendingRow pendingRow = pendingRows.get(rowKey);
- if (!pendingRow.isConcurrent()) {
- Mutation m = update.getFirst();
- if (m instanceof Put) {
- Put verifiedPut = new Put(m.getRow());
- // Set the status of the index row to "verified"
- verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
- context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
- } else {
- context.postIndexUpdates.put(hTableInterfaceReference, m);
- }
+ Mutation m = update.getFirst();
+ if (m instanceof Put) {
+ Put verifiedPut = new Put(m.getRow());
+ // Set the status of the index row to "verified"
+ verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+ context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
} else {
- if (!hasAllIndexedColumns(indexMaintainer, context.multiMutationMap.get(rowKey))) {
- // This batch needs to be retried since one of the concurrent mutations does not have the value
- // for an indexed column. Not including an index column may lead to incorrect index row key
- // generation for concurrent mutations since concurrent mutations are not serialized entirely
- // and do not see each other's effect on data table. Throwing an IOException will result in
- // retries of this batch. Before throwing exception, we need to remove reference counts and
- // locks for the rows of this batch
- removePendingRows(context);
- context.indexUpdates.clear();
- for (RowLock rowLock : context.rowLocks) {
- rowLock.release();
- }
- context.rowLocks.clear();
- throw new IOException("One of the concurrent mutations does not have all indexed columns. " +
- "The batch needs to be retried " + table.getNameAsString());
- }
+ context.postIndexUpdates.put(hTableInterfaceReference, m);
}
}
}
-
- // We are done with handling concurrent mutations. So we can remove the rows of this batch from
- // the collection of pending rows
removePendingRows(context);
context.indexUpdates.clear();
}
- /**
- * There are at most two rebuild mutation for every row, one put and one delete. They are listed in indexMutations
- * next to each other such that put comes before delete by {@link IndexRebuildRegionScanner}. This method is called
- * only for global indexes.
- */
- private void preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp,
- BatchMutateContext context,
- IndexMaintainer indexMaintainer) throws Throwable {
- Put put = null;
- List <Mutation> indexMutations = new ArrayList<>();
- for (int i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
- continue;
- }
- Mutation m = miniBatchOp.getOperation(i);
- if (!this.builder.isEnabled(m)) {
- continue;
- }
- if (m instanceof Put) {
- if (put != null) {
- indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
- }
- put = (Put)m;
- } else {
- indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, (Delete)m));
- put = null;
- }
- miniBatchOp.setOperationStatus(i, NOWRITE);
- }
- if (put != null) {
- indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
- }
- HTableInterfaceReference hTableInterfaceReference =
- new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
- context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
- for (Mutation m : indexMutations) {
- context.preIndexUpdates.put(hTableInterfaceReference, m);
- }
- doPre(c, context, miniBatchOp);
- // For rebuild updates, no post index update is prepared. Just create an empty list.
- context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
- }
private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) {
for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) {
@@ -912,46 +889,75 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
return false;
}
+ private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context)
+ throws Throwable {
+ boolean done;
+ BatchMutatePhase phase;
+ done = true;
+ for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) {
+ phase = lastContext.getCurrentPhase();
+ if (phase == BatchMutatePhase.FAILED) {
+ done = false;
+ break;
+ }
+ if (phase == BatchMutatePhase.PRE) {
+ CountDownLatch countDownLatch = lastContext.getCountDownLatch();
+ // Release the locks so that the previous concurrent mutation can go into the post phase
+ unlockRows(context);
+ // Wait for at most one concurrentMutationWaitDuration for each level in the dependency tree of batches.
+ // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext
+ if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration,
+ TimeUnit.MILLISECONDS)) {
+ done = false;
+ break;
+ }
+ // Acquire the locks again before letting the region proceed with data table updates
+ lockRows(context);
+ }
+ }
+ if (!done) {
+ // This batch needs to be retried since one of the previous concurrent batches has not completed yet.
+ // Throwing an IOException will result in retries of this batch. Before throwing exception,
+ // we need to remove reference counts and locks for the rows of this batch
+ removePendingRows(context);
+ context.indexUpdates.clear();
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ context.rowLocks.clear();
+ throw new IOException("One of the previous concurrent mutations has not completed. " +
+ "The batch needs to be retried " + table.getNameAsString());
+ }
+ }
+
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
ignoreAtomicOperations(miniBatchOp);
PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
setBatchMutateContext(c, context);
- Mutation firstMutation = miniBatchOp.getOperation(0);
- ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
- context.rebuild = replayWrite != null;
- if (context.rebuild) {
- preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, indexMetaData.getIndexMaintainers().get(0));
- return;
- }
/*
* Exclusively lock all rows so we get a consistent read
* while determining the index updates
*/
populateRowsToLock(miniBatchOp, context);
+ // early exit if it turns out we don't have any update for indexes
+ if (context.rowsToLock.isEmpty()) {
+ return;
+ }
lockRows(context);
-
long now = EnvironmentEdgeManager.currentTimeMillis();
- // Unless we're replaying edits to rebuild the index, we update the time stamp
- // of the data table to prevent overlapping time stamps (which prevents index
+ // Update the timestamps of the data table mutations to prevent overlapping timestamps (which prevents index
// inconsistencies as this case isn't handled correctly currently).
setTimestamps(miniBatchOp, builder, now);
- // Group all the updates for a single row into a single update to be processed (for local indexes, and global index retries)
- Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
- // early exit if it turns out we don't have any edits
- if (mutations == null || mutations.isEmpty()) {
- return;
- }
-
TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
if (hasGlobalIndex(indexMetaData)) {
+ // Prepare current and next data rows states for pending mutations (for global indexes)
+ prepareDataRowStates(c, miniBatchOp, context, now);
// Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
// concurrent updates
populatePendingRows(context);
- // Prepare current and next data rows states for pending mutations (for global indexes)
- prepareDataRowStates(c, miniBatchOp, context, now);
// early exit if it turns out we don't have any edits
long start = EnvironmentEdgeManager.currentTimeMillis();
preparePreIndexMutations(context, now, indexMetaData);
@@ -965,17 +971,19 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
LOG.debug("slept 1ms for " + table.getNameAsString());
}
// Release the locks before making RPC calls for index updates
- for (RowLock rowLock : context.rowLocks) {
- rowLock.release();
- }
+ unlockRows(context);
// Do the first phase index updates
doPre(c, context, miniBatchOp);
// Acquire the locks again before letting the region proceed with data table updates
- context.rowLocks.clear();
lockRows(context);
- preparePostIndexMutations(table, context, now, indexMetaData);
+ if (context.lastConcurrentBatchContext != null) {
+ waitForPreviousConcurrentBatch(table, context);
+ }
+ preparePostIndexMutations(context, now, indexMetaData);
}
if (hasLocalIndex(indexMetaData)) {
+ // Group all the updates for a single row into a single update to be processed (for local indexes)
+ Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
handleLocalIndexUpdates(table, miniBatchOp, mutations, indexMetaData);
}
if (failDataTableUpdatesForTesting) {
@@ -1006,9 +1014,17 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
return;
}
try {
- for (RowLock rowLock : context.rowLocks) {
- rowLock.release();
+ if (success) {
+ context.currentPhase = BatchMutatePhase.POST;
+ } else {
+ context.currentPhase = BatchMutatePhase.FAILED;
}
+ if (context.waitList != null) {
+ for (CountDownLatch countDownLatch : context.waitList) {
+ countDownLatch.countDown();
+ }
+ }
+ unlockRows(context);
this.builder.batchCompleted(miniBatchOp);
if (success) { // The pre-index and data table updates are successful, and now, do post index updates
@@ -1077,9 +1093,6 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- if (ignoreIndexRebuildForTesting && context.rebuild) {
- return;
- }
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
if (failPreIndexUpdatesForTesting) {
@@ -1097,11 +1110,7 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
// postBatchMutateIndispensably() is called
removePendingRows(context);
context.rowLocks.clear();
- if (context.rebuild) {
- throw new IOException(String.format("%s for rebuild", e.getMessage()), e);
- } else {
- rethrowIndexingException(e);
- }
+ rethrowIndexingException(e);
}
throw new RuntimeException(
"Somehow didn't complete the index update, but didn't return succesfully either!");