You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "tibrewalpratik17 (via GitHub)" <gi...@apache.org> on 2024/02/16 06:51:58 UTC

Re: [PR] Fix Bug in Handling Equal Comparison Column Values in Upsert [pinot]

tibrewalpratik17 commented on code in PR #12395:
URL: https://github.com/apache/pinot/pull/12395#discussion_r1492024212


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -167,6 +172,45 @@ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM
     }
   }
 
+  /**
+   * When we have to process a new segment, if there are comparison value ties for the same primary-key within the
+   * segment, then for Partial Upsert tables we need to make sure that the record location map is updated only
+   * for the latest version of the record. This is specifically a concern for Partial Upsert tables because Realtime
+   * consumption can potentially end up reading the wrong version of a record, which will lead to permanent
+   * data-inconsistency.
+   *
+   * <p>
+   *  This function returns an iterator that will de-dup
+   *  records
+   *  with the same primary-key. Moreover, for comparison ties, it will only keep the latest record. This iterator can
+   *  then further be used to update the primary-key record location map safely.
+   * </p>
+   *
+   * @param recordInfoIterator iterator over the new segment
+   * @return iterator that returns de-duplicated records. To resolve ties for comparison column values, we prefer to
+   *         return the latest record.
+   */
+  @VisibleForTesting
+  protected Iterator<RecordInfo> resolveComparisonTies(Iterator<RecordInfo> recordInfoIterator) {
+    Map<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<>();
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      Comparable newComparisonValue = recordInfo.getComparisonValue();
+      deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
+          (key, existingRecordInfo) -> {

Review Comment:
   can we rename `existingRecordInfo` --> `maxComparisonValueRecordInfo` for more clarity?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -69,6 +71,9 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
     String segmentName = segment.getSegmentName();
     segment.enableUpsert(this, validDocIds, queryableDocIds);
 
+    if (_partialUpsertHandler != null) {
+      recordInfoIterator = resolveComparisonTies(recordInfoIterator);
+    }

Review Comment:
   can we move this to the base class? we would want to do this in all scenarios. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org