You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/26 19:28:20 UTC
[pinot] branch master updated: Fix the upsert metadata bug when adding segment with same comparison value (#8590)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1d6ac065c1 Fix the upsert metadata bug when adding segment with same comparison value (#8590)
1d6ac065c1 is described below
commit 1d6ac065c11efc3907b12b075a1976142cb34dbf
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Apr 26 12:28:13 2022 -0700
Fix the upsert metadata bug when adding segment with same comparison value (#8590)
Fix the bug of reference check instead of value check in `PartitionUpsertMetadataManager.addSegment()`
---
.../upsert/PartitionUpsertMetadataManager.java | 17 ++---
.../upsert/PartitionUpsertMetadataManagerTest.java | 85 ++++++++++++++++------
2 files changed, 69 insertions(+), 33 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 6ba710f0f0..e416c3c0ca 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -103,13 +103,14 @@ public class PartitionUpsertMetadataManager {
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
+ IndexSegment currentSegment = currentRecordLocation.getSegment();
+ int comparisonResult = recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue());
// The current record is in the same segment
// Update the record location when there is a tie to keep the newer record. Note that the record info
// iterator will return records with incremental doc ids.
- IndexSegment currentSegment = currentRecordLocation.getSegment();
if (segment == currentSegment) {
- if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+ if (comparisonResult >= 0) {
validDocIds.replace(currentRecordLocation.getDocId(), recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
} else {
@@ -124,7 +125,7 @@ public class PartitionUpsertMetadataManager {
// doc ids for the old segment because it has not been replaced yet.
String currentSegmentName = currentSegment.getSegmentName();
if (segmentName.equals(currentSegmentName)) {
- if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+ if (comparisonResult >= 0) {
validDocIds.add(recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
} else {
@@ -136,12 +137,10 @@ public class PartitionUpsertMetadataManager {
// Update the record location when getting a newer comparison value, or the value is the same as the
// current value, but the segment has a larger sequence number (the segment is newer than the current
// segment).
- if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) > 0 || (
- recordInfo._comparisonValue == currentRecordLocation.getComparisonValue()
- && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
- && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
- && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
- currentSegmentName))) {
+ if (comparisonResult > 0 || (comparisonResult == 0 && LLCSegmentName.isLowLevelConsumerSegmentName(
+ segmentName) && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
+ && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
+ currentSegmentName))) {
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index a4d02cea17..80445d9987 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -60,12 +60,12 @@ public class PartitionUpsertMetadataManagerTest {
// Add the first segment
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 80));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, 120));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, 100));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(100)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, new IntWrapper(80)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, new IntWrapper(120)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
@@ -77,11 +77,11 @@ public class PartitionUpsertMetadataManagerTest {
// Add the second segment
ArrayList<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new ArrayList<>();
- recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
- recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
- recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 120));
- recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, 80));
- recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, 80));
+ recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+ recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
+ recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(120)));
+ recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, new IntWrapper(80)));
+ recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, new IntWrapper(80)));
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
@@ -142,13 +142,13 @@ public class PartitionUpsertMetadataManagerTest {
}
private static void checkRecordLocation(Map<Object, RecordLocation> recordLocationMap, int keyValue,
- IndexSegment segment, int docId, int timestamp, UpsertConfig.HashFunction hashFunction) {
+ IndexSegment segment, int docId, int comparisonValue, UpsertConfig.HashFunction hashFunction) {
RecordLocation recordLocation =
recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(keyValue), hashFunction));
assertNotNull(recordLocation);
assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
- assertEquals(recordLocation.getComparisonValue(), timestamp);
+ assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue);
}
@Test
@@ -166,9 +166,9 @@ public class PartitionUpsertMetadataManagerTest {
// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 120));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(120)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
@@ -177,7 +177,8 @@ public class PartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
IndexSegment segment2 = mockSegment(1, validDocIds2);
- upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100));
+ upsertMetadataManager.addRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, new IntWrapper(100)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -187,7 +188,8 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
- upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120));
+ upsertMetadataManager.addRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, new IntWrapper(120)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -197,7 +199,8 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100));
+ upsertMetadataManager.addRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, new IntWrapper(100)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -207,7 +210,8 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100));
+ upsertMetadataManager.addRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, new IntWrapper(100)));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
@@ -234,14 +238,14 @@ public class PartitionUpsertMetadataManagerTest {
// segment1: 0 -> {0, 100}, 1 -> {1, 100}
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
- recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+ recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new ArrayList<>();
- recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, 100));
- recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, 100));
+ recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, new IntWrapper(100)));
+ recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, new IntWrapper(100)));
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
@@ -274,4 +278,37 @@ public class PartitionUpsertMetadataManagerTest {
((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, UpsertConfig.HashFunction.MURMUR3)).getBytes()),
"8d68b314cc0c8de4dbd55f4dad3c3e66");
}
+
+ /**
+ * Use a wrapper class to ensure different value has different reference.
+ */
+ private static class IntWrapper implements Comparable<IntWrapper> {
+ final int _value;
+
+ IntWrapper(int value) {
+ _value = value;
+ }
+
+ @Override
+ public int compareTo(IntWrapper o) {
+ return Integer.compare(_value, o._value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IntWrapper)) {
+ return false;
+ }
+ IntWrapper that = (IntWrapper) o;
+ return _value == that._value;
+ }
+
+ @Override
+ public int hashCode() {
+ return _value;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org