You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/10/30 19:27:22 UTC

[incubator-pinot] branch master updated: [Upsert] Preserve the newer added record when 2 records have the same timestamp (#6213)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8678f5e  [Upsert] Preserve the newer added record when 2 records have the same timestamp (#6213)
8678f5e is described below

commit 8678f5ec0a5a58a095b38d27b5ecda55efdde8ec
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Oct 30 12:27:04 2020 -0700

    [Upsert] Preserve the newer added record when 2 records have the same timestamp (#6213)
    
    For upsert table, the record with newer timestamp will replace the old record with older timestamp, but when multiple records have the same timestamp, which record to preserve is undefined in the current implementation.
    This PR enhances the PartitionUpsertMetadataManager to preserve the latest ingested record if multiple records have the same timestamp:
    - If 2 records are not in the same segment, preserve the one in the segment with larger sequence number
    - If 2 records are in the same segment, preserve the one with larger doc id
    
    Note that for tables with sorted column, the records will be re-ordered when committing the segment, and we will use the re-ordered docIds instead of the ingestion order to decide which record to preserve.
---
 .../apache/pinot/common/utils/LLCSegmentName.java  | 14 +++-
 .../upsert/PartitionUpsertMetadataManager.java     | 54 +++++++-------
 .../upsert/PartitionUpsertMetadataManagerTest.java | 84 +++++++++++-----------
 3 files changed, 85 insertions(+), 67 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index 29824f1..adc24ad 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils;
 
+import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -35,11 +36,11 @@ public class LLCSegmentName extends SegmentName implements Comparable {
       throw new RuntimeException(segmentName + " is not a Low level consumer segment name");
     }
 
-    String[] parts = segmentName.split(SEPARATOR);
     _segmentName = segmentName;
+    String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
     _tableName = parts[0];
-    _partitionId = Integer.valueOf(parts[1]);
-    _sequenceNumber = Integer.valueOf(parts[2]);
+    _partitionId = Integer.parseInt(parts[1]);
+    _sequenceNumber = Integer.parseInt(parts[2]);
     _creationTime = parts[3];
   }
 
@@ -56,6 +57,13 @@ public class LLCSegmentName extends SegmentName implements Comparable {
     _segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
   }
 
+  /**
+   * Returns the sequence number of the given segment name.
+   */
+  public static int getSequenceNumber(String segmentName) {
+    return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, SEPARATOR)[2]);
+  }
+
   @Override
   public RealtimeSegmentType getSegmentType() {
     return RealtimeSegmentType.LLC;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
index 24acbab..3912c2c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.slf4j.Logger;
@@ -32,7 +33,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Manages the upsert metadata per partition.
- * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>For multiple records with the same timestamp, the manager will preserve the latest record based on the sequence
+ * number of the segment. If 2 records with the same timestamp are in the same segment, the one with larger doc id will
+ * be preserved. Note that for tables with sorted column, the records will be re-ordered when committing the segment,
+ * and we will use the re-ordered doc ids instead of the ingestion order to decide which record to preserve.
  * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
  * operation is done:
  * <ul>
@@ -83,34 +87,35 @@ public class PartitionUpsertMetadataManager {
           if (segmentName.equals(currentRecordLocation.getSegmentName())) {
             // The current record location has the same segment name
 
-            if (validDocIds == currentRecordLocation.getValidDocIds()) {
-              // The current record location is pointing to the new segment being loaded
-
-              // Update the record location when getting a newer timestamp
-              if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+            // There are 2 scenarios:
+            //   1. The current record location is pointing to the same segment (the segment being added). In this case,
+            //      we want to update the record location when there is a tie to keep the newer record. Note that the
+            //      record info iterator will return records with incremental doc ids.
+            //   2. The current record location is pointing to the old segment being replaced. This could happen when
+            //      committing a consuming segment, or reloading a completed segment. In this case, we want to update
+            //      the record location when there is a tie because the record locations should point to the new added
+            //      segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+            //      segment because it has not been replaced yet.
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
+              // Only update the valid doc ids for the new segment
+              if (validDocIds == currentRecordLocation.getValidDocIds()) {
                 validDocIds.remove(currentRecordLocation.getDocId());
-                validDocIds.add(recordInfo._docId);
-                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
               }
+              validDocIds.add(recordInfo._docId);
+              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
             } else {
-              // The current record location is pointing to the old segment being replaced. This could happen when
-              // committing a consuming segment, or reloading a completed segment.
-
-              // Update the record location when the new timestamp is greater than or equal to the current timestamp.
-              // Update the record location when there is a tie because the record locations should point to the new
-              // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
-              // segment because it has not been replaced yet.
-              if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-                validDocIds.add(recordInfo._docId);
-                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
-              }
+              return currentRecordLocation;
             }
-            return currentRecordLocation;
           } else {
             // The current record location is pointing to a different segment
 
-            // Update the record location when getting a newer timestamp
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
+            // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
+            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+                recordInfo._timestamp == currentRecordLocation.getTimestamp()
+                    && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
+                    .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
               currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
               return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
@@ -139,8 +144,9 @@ public class PartitionUpsertMetadataManager {
       if (currentRecordLocation != null) {
         // Existing primary key
 
-        // Update the record location when getting a newer timestamp
-        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+        // Update the record location when the new timestamp is greater than or equal to the current timestamp. Update
+        // the record location when there is a tie to keep the newer record.
+        if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
           currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
           validDocIds.add(recordInfo._docId);
           return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
index d1cd679..a2bde99 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -35,17 +37,17 @@ import static org.testng.Assert.assertSame;
 
 
 public class PartitionUpsertMetadataManagerTest {
-  private static final String SEGMENT_PREFIX = "testSegment";
-  private static final String REALTIME_TEST_TABLE = "testTable_REALTIME";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
   @Test
   public void testAddSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -55,14 +57,14 @@ public class PartitionUpsertMetadataManagerTest {
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100));
     ThreadSafeMutableRoaringBitmap validDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 4});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
 
     // Add the second segment
-    String segment2 = SEGMENT_PREFIX + 2;
+    String segment2 = getSegmentName(2);
     List<RecordInfo> recordInfoList2 = new ArrayList<>();
     recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -71,45 +73,47 @@ public class PartitionUpsertMetadataManagerTest {
     recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80));
     ThreadSafeMutableRoaringBitmap validDocIds2 =
         upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
-    // segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
 
     // Replace (reload) the first segment
     ThreadSafeMutableRoaringBitmap newValidDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    // original segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // original segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
-    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1);
 
     // Remove the original segment1
     upsertMetadataManager.removeSegment(segment1, validDocIds1);
-    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
-    // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
-    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
-    assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1);
   }
 
+  private static String getSegmentName(int sequenceNumber) {
+    return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, System.currentTimeMillis()).toString();
+  }
+
   private static PrimaryKey getPrimaryKey(int value) {
     return new PrimaryKey(new Object[]{value});
   }
@@ -126,12 +130,12 @@ public class PartitionUpsertMetadataManagerTest {
   @Test
   public void testUpdateRecord() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 120));
@@ -140,7 +144,7 @@ public class PartitionUpsertMetadataManagerTest {
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
 
     // Update records from the second segment
-    String segment2 = SEGMENT_PREFIX + 2;
+    String segment2 = getSegmentName(2);
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
 
     upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(3), 0, 100), validDocIds2);
@@ -174,32 +178,32 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
     upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(0), 3, 100), validDocIds2);
-    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
-    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    // segment1: 1 -> {1, 120}
+    // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
     checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
     checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
     checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
-    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
   }
 
   @Test
   public void testRemoveSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
     Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add 2 segments
     // segment1: 0 -> {0, 100}, 1 -> {1, 100}
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
-    String segment1 = SEGMENT_PREFIX + 1;
+    String segment1 = getSegmentName(1);
     List<RecordInfo> recordInfoList1 = new ArrayList<>();
     recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
     recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
     ThreadSafeMutableRoaringBitmap validDocIds1 =
         upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
-    String segment2 = SEGMENT_PREFIX + 1;
+    String segment2 = getSegmentName(2);
     List<RecordInfo> recordInfoList2 = new ArrayList<>();
     recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 0, 100));
     recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 1, 100));
@@ -211,8 +215,8 @@ public class PartitionUpsertMetadataManagerTest {
     // segment2: 2 -> {0, 100}, 3 -> {0, 100}
     assertNull(recordLocationMap.get(getPrimaryKey(0)));
     assertNull(recordLocationMap.get(getPrimaryKey(1)));
-    checkRecordLocation(recordLocationMap, 2, segment1, 0, 100);
-    checkRecordLocation(recordLocationMap, 3, segment1, 1, 100);
+    checkRecordLocation(recordLocationMap, 2, segment2, 0, 100);
+    checkRecordLocation(recordLocationMap, 3, segment2, 1, 100);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org