You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/23 22:52:08 UTC

[pinot] branch master updated: Simplify the handling for partial-upsert record update (#10970)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e8841316fa Simplify the handling for partial-upsert record update (#10970)
e8841316fa is described below

commit e8841316fa63bf5406d03f3459ae0c40985fbf10
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jun 23 15:52:01 2023 -0700

    Simplify the handling for partial-upsert record update (#10970)
---
 ...oncurrentMapPartitionUpsertMetadataManager.java | 47 ++++++++--------------
 1 file changed, 16 insertions(+), 31 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index e7ad9c5919..a65ce670bd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
@@ -256,40 +255,26 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
     assert _partialUpsertHandler != null;
     AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>();
-    AtomicBoolean outOfOrder = new AtomicBoolean();
-    RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent(
-        HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> {
-          if (recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) {
-            if (!recordInfo.isDeleteRecord()) {
-              IndexSegment currentSegment = recordLocation.getSegment();
-              int currentDocId = recordLocation.getDocId();
-              ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
-              if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
-                // if delete is not enabled or previous record not marked as deleted
-                _reuse.clear();
-                previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
-              }
+    _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
+        (pk, recordLocation) -> {
+          // Read the previous record if the following conditions are met:
+          // - New record is not a DELETE record
+          // - New record is not out-of-order
+          // - Previous record is not deleted
+          if (!recordInfo.isDeleteRecord()
+              && recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) {
+            IndexSegment currentSegment = recordLocation.getSegment();
+            ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
+            int currentDocId = recordLocation.getDocId();
+            if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
+              _reuse.clear();
+              previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
             }
-          } else {
-            outOfOrder.set(true);
           }
           return recordLocation;
         });
-    if (currentRecordLocation != null) {
-      // Existing primary key
-      if (!outOfOrder.get()) {
-        GenericRow previousRecord = previousRecordReference.get();
-        if (previousRecord == null) {
-          return record;
-        }
-        return _partialUpsertHandler.merge(previousRecord, record);
-      } else {
-        return record;
-      }
-    } else {
-      // New primary key
-      return record;
-    }
+    GenericRow previousRecord = previousRecordReference.get();
+    return previousRecord != null ? _partialUpsertHandler.merge(previousRecord, record) : record;
   }
 
   @VisibleForTesting


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org