You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/26 19:28:20 UTC

[pinot] branch master updated: Fix the upsert metadata bug when adding segment with same comparison value (#8590)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d6ac065c1 Fix the upsert metadata bug when adding segment with same comparison value (#8590)
1d6ac065c1 is described below

commit 1d6ac065c11efc3907b12b075a1976142cb34dbf
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Apr 26 12:28:13 2022 -0700

    Fix the upsert metadata bug when adding segment with same comparison value (#8590)
    
    Fix the bug of reference check instead of value check in `PartitionUpsertMetadataManager.addSegment()`
---
 .../upsert/PartitionUpsertMetadataManager.java     | 17 ++---
 .../upsert/PartitionUpsertMetadataManagerTest.java | 85 ++++++++++++++++------
 2 files changed, 69 insertions(+), 33 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 6ba710f0f0..e416c3c0ca 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -103,13 +103,14 @@ public class PartitionUpsertMetadataManager {
           (primaryKey, currentRecordLocation) -> {
             if (currentRecordLocation != null) {
               // Existing primary key
+              IndexSegment currentSegment = currentRecordLocation.getSegment();
+              int comparisonResult = recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue());
 
               // The current record is in the same segment
               // Update the record location when there is a tie to keep the newer record. Note that the record info
               // iterator will return records with incremental doc ids.
-              IndexSegment currentSegment = currentRecordLocation.getSegment();
               if (segment == currentSegment) {
-                if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+                if (comparisonResult >= 0) {
                   validDocIds.replace(currentRecordLocation.getDocId(), recordInfo._docId);
                   return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
                 } else {
@@ -124,7 +125,7 @@ public class PartitionUpsertMetadataManager {
               // doc ids for the old segment because it has not been replaced yet.
               String currentSegmentName = currentSegment.getSegmentName();
               if (segmentName.equals(currentSegmentName)) {
-                if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+                if (comparisonResult >= 0) {
                   validDocIds.add(recordInfo._docId);
                   return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
                 } else {
@@ -136,12 +137,10 @@ public class PartitionUpsertMetadataManager {
               // Update the record location when getting a newer comparison value, or the value is the same as the
               // current value, but the segment has a larger sequence number (the segment is newer than the current
               // segment).
-              if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) > 0 || (
-                  recordInfo._comparisonValue == currentRecordLocation.getComparisonValue()
-                      && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                      && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
-                      && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
-                      currentSegmentName))) {
+              if (comparisonResult > 0 || (comparisonResult == 0 && LLCSegmentName.isLowLevelConsumerSegmentName(
+                  segmentName) && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
+                  && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
+                  currentSegmentName))) {
                 Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId());
                 validDocIds.add(recordInfo._docId);
                 return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index a4d02cea17..80445d9987 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -60,12 +60,12 @@ public class PartitionUpsertMetadataManagerTest {
 
     // Add the first segment
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 80));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, 120));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, 100));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(100)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, new IntWrapper(80)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 4, new IntWrapper(120)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 5, new IntWrapper(100)));
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
     upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
@@ -77,11 +77,11 @@ public class PartitionUpsertMetadataManagerTest {
 
     // Add the second segment
     ArrayList<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new ArrayList<>();
-    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
-    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
-    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 120));
-    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, 80));
-    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, 80));
+    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
+    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(120)));
+    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 3, new IntWrapper(80)));
+    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 4, new IntWrapper(80)));
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
     upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
@@ -142,13 +142,13 @@ public class PartitionUpsertMetadataManagerTest {
   }
 
   private static void checkRecordLocation(Map<Object, RecordLocation> recordLocationMap, int keyValue,
-      IndexSegment segment, int docId, int timestamp, UpsertConfig.HashFunction hashFunction) {
+      IndexSegment segment, int docId, int comparisonValue, UpsertConfig.HashFunction hashFunction) {
     RecordLocation recordLocation =
         recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(keyValue), hashFunction));
     assertNotNull(recordLocation);
     assertSame(recordLocation.getSegment(), segment);
     assertEquals(recordLocation.getDocId(), docId);
-    assertEquals(recordLocation.getComparisonValue(), timestamp);
+    assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue);
   }
 
   @Test
@@ -166,9 +166,9 @@ public class PartitionUpsertMetadataManagerTest {
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 120));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, 100));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(120)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 2, new IntWrapper(100)));
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
     upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
@@ -177,7 +177,8 @@ public class PartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     IndexSegment segment2 = mockSegment(1, validDocIds2);
 
-    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100));
+    upsertMetadataManager.addRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, new IntWrapper(100)));
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -187,7 +188,8 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
 
-    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120));
+    upsertMetadataManager.addRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, new IntWrapper(120)));
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -197,7 +199,8 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
-    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100));
+    upsertMetadataManager.addRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, new IntWrapper(100)));
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -207,7 +210,8 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
-    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100));
+    upsertMetadataManager.addRecord(segment2,
+        new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, new IntWrapper(100)));
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
@@ -234,14 +238,14 @@ public class PartitionUpsertMetadataManagerTest {
     // segment1: 0 -> {0, 100}, 1 -> {1, 100}
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new ArrayList<>();
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, 100));
-    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, 100));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 0, new IntWrapper(100)));
+    recordInfoList1.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 1, new IntWrapper(100)));
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
     upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
     List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList2 = new ArrayList<>();
-    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, 100));
-    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, 100));
+    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 0, new IntWrapper(100)));
+    recordInfoList2.add(new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 1, new IntWrapper(100)));
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 = mockSegment(2, validDocIds2);
     upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
@@ -274,4 +278,37 @@ public class PartitionUpsertMetadataManagerTest {
         ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk, UpsertConfig.HashFunction.MURMUR3)).getBytes()),
         "8d68b314cc0c8de4dbd55f4dad3c3e66");
   }
+
+  /**
+   * Use a wrapper class to ensure different value has different reference.
+   */
+  private static class IntWrapper implements Comparable<IntWrapper> {
+    final int _value;
+
+    IntWrapper(int value) {
+      _value = value;
+    }
+
+    @Override
+    public int compareTo(IntWrapper o) {
+      return Integer.compare(_value, o._value);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof IntWrapper)) {
+        return false;
+      }
+      IntWrapper that = (IntWrapper) o;
+      return _value == that._value;
+    }
+
+    @Override
+    public int hashCode() {
+      return _value;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org