You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/21 01:07:37 UTC

(pinot) branch master updated: Fix Bug in Handling Equal Comparison Column Values in Upsert (#12395)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d0c28fb9d9 Fix Bug in Handling Equal Comparison Column Values in Upsert (#12395)
d0c28fb9d9 is described below

commit d0c28fb9d94a8cf09fb46a16d071a0f715839163
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Tue Feb 20 17:07:31 2024 -0800

    Fix Bug in Handling Equal Comparison Column Values in Upsert (#12395)
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 43 ++++++++++++++++++++++
 ...oncurrentMapPartitionUpsertMetadataManager.java |  3 ++
 .../BasePartitionUpsertMetadataManagerTest.java    | 36 ++++++++++++++++++
 3 files changed, 82 insertions(+)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 304de5bdfe..cdf2a61fe9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -27,9 +27,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +48,7 @@ import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -601,6 +604,46 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
+  /**
+   * When we have to process a new segment, if there are comparison value ties for the same primary-key within the
+   * segment, then for Partial Upsert tables we need to make sure that the record location map is updated only
+   * for the latest version of the record. This is specifically a concern for Partial Upsert tables because Realtime
+   * consumption can potentially end up reading the wrong version of a record, which will lead to permanent
+   * data-inconsistency.
+   *
+   * <p>
+   *  This function returns an iterator that will de-dup records with the same primary-key. Moreover, for comparison
+   *  ties, it will only keep the latest record. This iterator can then further be used to update the primary-key
+   *  record location map safely.
+   * </p>
+   *
+   * @param recordInfoIterator iterator over the new segment
+   * @param hashFunction       hash function configured for Upsert's primary keys
+   * @return iterator that returns de-duplicated records. To resolve ties for comparison column values, we prefer to
+   *         return the latest record.
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  protected static Iterator<RecordInfo> resolveComparisonTies(
+      Iterator<RecordInfo> recordInfoIterator, HashFunction hashFunction) {
+    Map<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<>();
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      Comparable newComparisonValue = recordInfo.getComparisonValue();
+      deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), hashFunction),
+          (key, maxComparisonValueRecordInfo) -> {
+            if (maxComparisonValueRecordInfo == null) {
+              return recordInfo;
+            }
+            int comparisonResult = newComparisonValue.compareTo(maxComparisonValueRecordInfo.getComparisonValue());
+            if (comparisonResult >= 0) {
+              return recordInfo;
+            }
+            return maxComparisonValueRecordInfo;
+          });
+    }
+    return deDuplicatedRecordInfo.values().iterator();
+  }
+
   @Override
   public void takeSnapshot() {
     if (!_enableSnapshot) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 887582538c..735750ff9d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -69,6 +69,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     String segmentName = segment.getSegmentName();
     segment.enableUpsert(this, validDocIds, queryableDocIds);
 
+    if (_partialUpsertHandler != null) {
+      recordInfoIterator = resolveComparisonTies(recordInfoIterator, _hashFunction);
+    }
     AtomicInteger numKeysInWrongSegment = new AtomicInteger();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 5fce4cc3b0..2f02e563f9 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -36,7 +37,9 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -45,6 +48,7 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
@@ -111,6 +115,34 @@ public class BasePartitionUpsertMetadataManagerTest {
     assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
   }
 
+  @Test
+  public void testResolveComparisonTies() {
+    // Build a record info list for testing
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{0, 0, 0, 0, 0, 0};
+    int numRecords = primaryKeys.length;
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    for (int docId = 0; docId < numRecords; docId++) {
+      recordInfoList.add(new RecordInfo(
+          makePrimaryKey(primaryKeys[docId]), docId, timestamps[docId], false));
+    }
+    // Resolve comparison ties
+    Iterator<RecordInfo> deDuplicatedRecords =
+        BasePartitionUpsertMetadataManager.resolveComparisonTies(recordInfoList.iterator(), HashFunction.NONE);
+    // Ensure we have only 1 record for each unique primary key
+    Map<PrimaryKey, RecordInfo> recordsByPrimaryKeys = new HashMap<>();
+    while (deDuplicatedRecords.hasNext()) {
+      RecordInfo recordInfo = deDuplicatedRecords.next();
+      assertFalse(recordsByPrimaryKeys.containsKey(recordInfo.getPrimaryKey()));
+      recordsByPrimaryKeys.put(recordInfo.getPrimaryKey(), recordInfo);
+    }
+    assertEquals(recordsByPrimaryKeys.size(), 3);
+    // Ensure that to resolve ties, we pick the last docId
+    assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(0)).getDocId(), 5);
+    assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(1)).getDocId(), 4);
+    assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(2)).getDocId(), 2);
+  }
+
   private static ThreadSafeMutableRoaringBitmap createValidDocIds(int... docIds) {
     MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
     bitmap.add(docIds);
@@ -132,6 +164,10 @@ public class BasePartitionUpsertMetadataManagerTest {
     };
   }
 
+  private static PrimaryKey makePrimaryKey(int value) {
+    return new PrimaryKey(new Object[]{value});
+  }
+
   private static class DummyPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager {
 
     protected DummyPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org