You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/07 19:52:04 UTC
[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5562 Simplify
detection of concurrent updates on data tables with indexes
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new 69448a8 PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes
69448a8 is described below
commit 69448a845e843628558e17032287fe9e2bb6c5cf
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Nov 6 22:04:20 2019 -0800
PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes
---
.../phoenix/hbase/index/IndexRegionObserver.java | 51 ++++++++--------------
1 file changed, 17 insertions(+), 34 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e8d9a05..b058b33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -104,19 +104,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
* Class to represent pending data table rows
*/
private static class PendingRow {
- private long latestTimestamp;
- private long count;
+ private boolean concurrent = false;
+ private long count = 1;
- PendingRow(long latestTimestamp) {
- count = 1;
- this.latestTimestamp = latestTimestamp;
- }
-
- public void add(long timestamp) {
+ public void add() {
count++;
- if (latestTimestamp < timestamp) {
- latestTimestamp = timestamp;
- }
+ concurrent = true;
}
public void remove() {
@@ -127,8 +120,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
return count;
}
- public long getLatestTimestamp() {
- return latestTimestamp;
+ public boolean isConcurrent() {
+ return concurrent;
}
}
@@ -159,10 +152,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
// The collection of candidate index mutations that will be applied after the data table mutations
private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- // The set of row keys for the data table rows of this batch such that for each of these rows there exists another
- // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
- // row (i.e., concurrent updates).
- private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
long dataWriteStartTime;
@@ -401,16 +390,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
}
- private void populatePendingRows(BatchMutateContext context, long now) {
+ private void populatePendingRows(BatchMutateContext context) {
for (RowLock rowLock : context.rowLocks) {
ImmutableBytesPtr rowKey = rowLock.getRowKey();
PendingRow pendingRow = pendingRows.get(rowKey);
if (pendingRow == null) {
- pendingRows.put(rowKey, new PendingRow(now));
+ pendingRows.put(rowKey, new PendingRow());
} else {
// m is a mutation on a row that has already a pending mutation in progress from another batch
- pendingRow.add(now);
- context.pendingRows.add(rowKey);
+ pendingRow.add();
}
}
}
@@ -579,17 +567,12 @@ public class IndexRegionObserver extends BaseRegionObserver {
Put unverifiedPut = new Put(m.getRow());
unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
- // Ignore post index updates (i.e., the third write phase updates) for this row if it is
- // going through concurrent updates
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
- if (!context.pendingRows.contains(rowKey)) {
- if (m instanceof Put) {
- // Remove the empty column prepared by Index codec as we need to change its value
- removeEmptyColumn(m, emptyCF, emptyCQ);
- ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
- }
- context.intermediatePostIndexUpdates.add(next);
+ if (m instanceof Put) {
+ // Remove the empty column prepared by Index codec as we need to change its value
+ removeEmptyColumn(m, emptyCF, emptyCQ);
+ ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
}
+ context.intermediatePostIndexUpdates.add(next);
}
}
}
@@ -639,7 +622,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
// Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
// concurrent updates
if (replayWrite == null) {
- populatePendingRows(context, now);
+ populatePendingRows(context);
}
// First group all the updates for a single row into a single update to be processed
Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
@@ -682,9 +665,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
PendingRow pendingRow = pendingRows.get(rowKey);
- // Has any concurrent mutation arrived for the same row? if so, skip post index updates
+ // Are there concurrent updates on the data table row? if so, skip post index updates
// and let read repair resolve conflicts
- if (pendingRow.getLatestTimestamp() > now) {
+ if (pendingRow.isConcurrent()) {
iterator.remove();
}
}