You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/10/30 17:59:50 UTC

[incubator-pinot] 01/01: [Upsert] Preserve the newer added record when 2 records have the same timestamp

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch upsert_test
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ac9d7f6f639118acd8b3322b9780912999fc03bb
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Oct 29 19:25:16 2020 -0700

    [Upsert] Preserve the newer added record when 2 records have the same timestamp
---
 .../apache/pinot/common/utils/LLCSegmentName.java  | 14 +++-
 .../upsert/PartitionUpsertMetadataManager.java     | 54 +++++++-------
 .../upsert/PartitionUpsertMetadataManagerTest.java | 84 +++++++++++-----------
 3 files changed, 85 insertions(+), 67 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index 29824f1..adc24ad 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils;
 
+import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -35,11 +36,11 @@ public class LLCSegmentName extends SegmentName implements Comparable {
       throw new RuntimeException(segmentName + " is not a Low level consumer segment name");
     }
 
-    String[] parts = segmentName.split(SEPARATOR);
     _segmentName = segmentName;
+    String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
     _tableName = parts[0];
-    _partitionId = Integer.valueOf(parts[1]);
-    _sequenceNumber = Integer.valueOf(parts[2]);
+    _partitionId = Integer.parseInt(parts[1]);
+    _sequenceNumber = Integer.parseInt(parts[2]);
     _creationTime = parts[3];
   }
 
@@ -56,6 +57,13 @@ public class LLCSegmentName extends SegmentName implements Comparable {
     _segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
   }
 
+  /**
+   * Returns the sequence number of the given segment name.
+   */
+  public static int getSequenceNumber(String segmentName) {
+    return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, SEPARATOR)[2]);
+  }
+
   @Override
   public RealtimeSegmentType getSegmentType() {
     return RealtimeSegmentType.LLC;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
index 24acbab..3912c2c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.slf4j.Logger;
@@ -32,7 +33,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Manages the upsert metadata per partition.
- * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>For multiple records with the same timestamp, the manager will preserve the latest record based on the sequence
+ * number of the segment. If 2 records with the same timestamp are in the same segment, the one with larger doc id will
+ * be preserved. Note that for tables with sorted column, the records will be re-ordered when committing the segment,
+ * and we will use the re-ordered doc ids instead of the ingestion order to decide which record to preserve.
  * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
  * operation is done:
  * <ul>
@@ -83,34 +87,35 @@ public class PartitionUpsertMetadataManager {
           if (segmentName.equals(currentRecordLocation.getSegmentName())) {
             // The current record location has the same segment name
 
-            if (validDocIds == currentRecordLocation.getValidDocIds()) {
-              // The current record location is pointing to the new segment being loaded
-
-              // Update the record location when getting a newer timestamp
-              if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+            // There are 2 scenarios:
+            //   1. The current record location is pointing to the same segment (the segment being added). In this case,
+            //      we want to update the record location when there is a tie to keep the newer record. Note that the
+            //      record info iterator will return records with incremental doc ids.
+            //   2. The current record location is pointing to the old segment being replaced. This could happen when
+            //      committing a consuming segment, or reloading a completed segment. In this case, we want to update
+            //      the record location when there is a tie because the record locations should point to the new added
+            //      segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+            //      segment because it has not been replaced yet.
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
+              // Only update the valid doc ids for the new segment
+              if (validDocIds == currentRecordLocation.getValidDocIds()) {
                 validDocIds.remove(currentRecordLocation.getDocId());
-                validDocIds.add(recordInfo._docId);
-                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
               }
+              validDocIds.add(recordInfo._docId);
+              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
             } else {
-              // The current record location is pointing to the old segment being replaced. This could happen when
-              // committing a consuming segment, or reloading a completed segment.
-
-              // Update the record location when the new timestamp is greater than or equal to the current timestamp.
-              // Update the record location when there is a tie because the record locations should point to the new
-              // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
-              // segment because it has not been replaced yet.
-              if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-                validDocIds.add(recordInfo._docId);
-                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
-              }
+              return currentRecordLocation;
             }
-            return currentRecordLocation;
           } else {
             // The current record location is pointing to a different segment
 
-            // Update the record location when getting a newer timestamp
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
+            // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
+            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+                recordInfo._timestamp == currentRecordLocation.getTimestamp()
+                    && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
+                    .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
               currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
               return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
@@ -139,8 +144,9 @@ public class PartitionUpsertMetadataManager {
       if (currentRecordLocation != null) {
         // Existing primary key
 
-        // Update the record location when getting a newer timestamp
-        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+        // Update the record location when the new timestamp is greater than or equal to the current timestamp. Update
+        // the record location when there is a tie to keep the newer record.
+        if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
           currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
           validDocIds.add(recordInfo._docId);
           return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
index d1cd679..a2bde99 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -35,17 +37,17 @@ import static org.testng.Assert.assertSame;
 
 
 public class PartitionUpsertMetadataManagerTest {
-  private static final String SEGMENT_PREFIX = "testSegment";
-  private static final String REALTIME_TEST_TABLE = "testTable_REALTIME";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
   @Test
   public void testAddSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -55,14 +57,14 @@ public class PartitionUpsertMetadataManagerTest {
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100));
     ThreadSafeMutableRoaringBitmap validDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 4});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
 
     // Add the second segment
-    String segment2 = SEGMENT_PREFIX + 2;
+    String segment2 = getSegmentName(2);
     List<RecordInfo> recordInfoList2 = new ArrayList<>();
     recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -71,45 +73,47 @@ public class PartitionUpsertMetadataManagerTest {
     recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80));
     ThreadSafeMutableRoaringBitmap validDocIds2 =
         upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
-    // segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
 
     // Replace (reload) the first segment
     ThreadSafeMutableRoaringBitmap newValidDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    // original segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // original segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
-    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1);
 
     // Remove the original segment1
     upsertMetadataManager.removeSegment(segment1, validDocIds1);
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
-    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1);
   }
 
+  private static String getSegmentName(int sequenceNumber) {
+    return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, System.currentTimeMillis()).toString();
+  }
+
   private static PrimaryKey getPrimaryKey(int value) {
     return new PrimaryKey(new Object[]{value});
   }
@@ -126,12 +130,12 @@ public class PartitionUpsertMetadataManagerTest {
   @Test
   public void testUpdateRecord() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 120));
@@ -140,7 +144,7 @@ public class PartitionUpsertMetadataManagerTest {
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
 
     // Update records from the second segment
-    String segment2 = SEGMENT_PREFIX + 2;
+    String segment2 = getSegmentName(2);
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
 
     upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(3), 0, 100), validDocIds2);
@@ -174,32 +178,32 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
     upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(0), 3, 100), validDocIds2);
-    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
-    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 1 -> {1, 120}
+    // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
   }
 
   @Test
   public void testRemoveSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add 2 segments
     // segment1: 0 -> {0, 100}, 1 -> {1, 100}
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
     ThreadSafeMutableRoaringBitmap validDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    String segment2 = SEGMENT_PREFIX + 1;
+    String segment2 = getSegmentName(2);
     List<RecordInfo> recordInfoList2 = new ArrayList<>();
     recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 0, 100));
     recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 1, 100));
@@ -211,8 +215,8 @@ public class PartitionUpsertMetadataManagerTest {
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
     assertNull(recordLocationMap.get(getPrimaryKey(0)));
     assertNull(recordLocationMap.get(getPrimaryKey(1)));
-    checkRecordLocation(recordLocationMap, 2, segment1, 0, 100);
-    checkRecordLocation(recordLocationMap, 3, segment1, 1, 100);
+    checkRecordLocation(recordLocationMap, 2, segment2, 0, 100);
+    checkRecordLocation(recordLocationMap, 3, segment2, 1, 100);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org