You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/21 01:07:37 UTC
(pinot) branch master updated: Fix Bug in Handling Equal Comparison Column Values in Upsert (#12395)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d0c28fb9d9 Fix Bug in Handling Equal Comparison Column Values in Upsert (#12395)
d0c28fb9d9 is described below
commit d0c28fb9d94a8cf09fb46a16d071a0f715839163
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Tue Feb 20 17:07:31 2024 -0800
Fix Bug in Handling Equal Comparison Column Values in Upsert (#12395)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 43 ++++++++++++++++++++++
...oncurrentMapPartitionUpsertMetadataManager.java | 3 ++
.../BasePartitionUpsertMetadataManagerTest.java | 36 ++++++++++++++++++
3 files changed, 82 insertions(+)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 304de5bdfe..cdf2a61fe9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -27,9 +27,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -46,6 +48,7 @@ import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -601,6 +604,46 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
}
+ /**
+ * When we have to process a new segment, if there are comparison value ties for the same primary-key within the
+ * segment, then for Partial Upsert tables we need to make sure that the record location map is updated only
+ * for the latest version of the record. This is specifically a concern for Partial Upsert tables because Realtime
+ * consumption can potentially end up reading the wrong version of a record, which will lead to permanent
+ * data-inconsistency.
+ *
+ * <p>
+ * This function returns an iterator that will de-dup records with the same primary-key. Moreover, for comparison
+ * ties, it will only keep the latest record. This iterator can then further be used to update the primary-key
+ * record location map safely.
+ * </p>
+ *
+ * @param recordInfoIterator iterator over the new segment
+ * @param hashFunction hash function configured for Upsert's primary keys
+ * @return iterator that returns de-duplicated records. To resolve ties for comparison column values, we prefer to
+ * return the latest record.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected static Iterator<RecordInfo> resolveComparisonTies(
+ Iterator<RecordInfo> recordInfoIterator, HashFunction hashFunction) {
+ Map<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<>();
+ while (recordInfoIterator.hasNext()) {
+ RecordInfo recordInfo = recordInfoIterator.next();
+ Comparable newComparisonValue = recordInfo.getComparisonValue();
+ deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), hashFunction),
+ (key, maxComparisonValueRecordInfo) -> {
+ if (maxComparisonValueRecordInfo == null) {
+ return recordInfo;
+ }
+ int comparisonResult = newComparisonValue.compareTo(maxComparisonValueRecordInfo.getComparisonValue());
+ if (comparisonResult >= 0) {
+ return recordInfo;
+ }
+ return maxComparisonValueRecordInfo;
+ });
+ }
+ return deDuplicatedRecordInfo.values().iterator();
+ }
+
@Override
public void takeSnapshot() {
if (!_enableSnapshot) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 887582538c..735750ff9d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -69,6 +69,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
String segmentName = segment.getSegmentName();
segment.enableUpsert(this, validDocIds, queryableDocIds);
+ if (_partialUpsertHandler != null) {
+ recordInfoIterator = resolveComparisonTies(recordInfoIterator, _hashFunction);
+ }
AtomicInteger numKeysInWrongSegment = new AtomicInteger();
while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
index 5fce4cc3b0..2f02e563f9 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -36,7 +37,9 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -45,6 +48,7 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -111,6 +115,34 @@ public class BasePartitionUpsertMetadataManagerTest {
assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
}
+ @Test
+ public void testResolveComparisonTies() {
+ // Build a record info list for testing
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{0, 0, 0, 0, 0, 0};
+ int numRecords = primaryKeys.length;
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int docId = 0; docId < numRecords; docId++) {
+ recordInfoList.add(new RecordInfo(
+ makePrimaryKey(primaryKeys[docId]), docId, timestamps[docId], false));
+ }
+ // Resolve comparison ties
+ Iterator<RecordInfo> deDuplicatedRecords =
+ BasePartitionUpsertMetadataManager.resolveComparisonTies(recordInfoList.iterator(), HashFunction.NONE);
+ // Ensure we have only 1 record for each unique primary key
+ Map<PrimaryKey, RecordInfo> recordsByPrimaryKeys = new HashMap<>();
+ while (deDuplicatedRecords.hasNext()) {
+ RecordInfo recordInfo = deDuplicatedRecords.next();
+ assertFalse(recordsByPrimaryKeys.containsKey(recordInfo.getPrimaryKey()));
+ recordsByPrimaryKeys.put(recordInfo.getPrimaryKey(), recordInfo);
+ }
+ assertEquals(recordsByPrimaryKeys.size(), 3);
+ // Ensure that to resolve ties, we pick the last docId
+ assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(0)).getDocId(), 5);
+ assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(1)).getDocId(), 4);
+ assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(2)).getDocId(), 2);
+ }
+
private static ThreadSafeMutableRoaringBitmap createValidDocIds(int... docIds) {
MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
bitmap.add(docIds);
@@ -132,6 +164,10 @@ public class BasePartitionUpsertMetadataManagerTest {
};
}
+ private static PrimaryKey makePrimaryKey(int value) {
+ return new PrimaryKey(new Object[]{value});
+ }
+
private static class DummyPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager {
protected DummyPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org