You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/23 22:52:08 UTC
[pinot] branch master updated: Simplify the handling for partial-upsert record update (#10970)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e8841316fa Simplify the handling for partial-upsert record update (#10970)
e8841316fa is described below
commit e8841316fa63bf5406d03f3459ae0c40985fbf10
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jun 23 15:52:01 2023 -0700
Simplify the handling for partial-upsert record update (#10970)
---
...oncurrentMapPartitionUpsertMetadataManager.java | 47 ++++++++--------------
1 file changed, 16 insertions(+), 31 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index e7ad9c5919..a65ce670bd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -256,40 +255,26 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
assert _partialUpsertHandler != null;
AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>();
- AtomicBoolean outOfOrder = new AtomicBoolean();
- RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent(
- HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> {
- if (recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) {
- if (!recordInfo.isDeleteRecord()) {
- IndexSegment currentSegment = recordLocation.getSegment();
- int currentDocId = recordLocation.getDocId();
- ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
- if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
- // if delete is not enabled or previous record not marked as deleted
- _reuse.clear();
- previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
- }
+ _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
+ (pk, recordLocation) -> {
+ // Read the previous record if the following conditions are met:
+ // - New record is not a DELETE record
+ // - New record is not out-of-order
+ // - Previous record is not deleted
+ if (!recordInfo.isDeleteRecord()
+ && recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) {
+ IndexSegment currentSegment = recordLocation.getSegment();
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
+ int currentDocId = recordLocation.getDocId();
+ if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
+ _reuse.clear();
+ previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
}
- } else {
- outOfOrder.set(true);
}
return recordLocation;
});
- if (currentRecordLocation != null) {
- // Existing primary key
- if (!outOfOrder.get()) {
- GenericRow previousRecord = previousRecordReference.get();
- if (previousRecord == null) {
- return record;
- }
- return _partialUpsertHandler.merge(previousRecord, record);
- } else {
- return record;
- }
- } else {
- // New primary key
- return record;
- }
+ GenericRow previousRecord = previousRecordReference.get();
+ return previousRecord != null ? _partialUpsertHandler.merge(previousRecord, record) : record;
}
@VisibleForTesting
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org