You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/10/30 17:59:50 UTC
[incubator-pinot] 01/01: [Upsert] Preserve the newer added record
when 2 records have the same timestamp
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch upsert_test
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ac9d7f6f639118acd8b3322b9780912999fc03bb
Author: Xiaotian (Jackie) Jiang <ja...@gmail.com>
AuthorDate: Thu Oct 29 19:25:16 2020 -0700
[Upsert] Preserve the newer added record when 2 records have the same timestamp
---
.../apache/pinot/common/utils/LLCSegmentName.java | 14 +++-
.../upsert/PartitionUpsertMetadataManager.java | 54 +++++++-------
.../upsert/PartitionUpsertMetadataManagerTest.java | 84 +++++++++++-----------
3 files changed, 85 insertions(+), 67 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index 29824f1..adc24ad 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils;
+import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -35,11 +36,11 @@ public class LLCSegmentName extends SegmentName implements Comparable {
throw new RuntimeException(segmentName + " is not a Low level consumer segment name");
}
- String[] parts = segmentName.split(SEPARATOR);
_segmentName = segmentName;
+ String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
_tableName = parts[0];
- _partitionId = Integer.valueOf(parts[1]);
- _sequenceNumber = Integer.valueOf(parts[2]);
+ _partitionId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
_creationTime = parts[3];
}
@@ -56,6 +57,13 @@ public class LLCSegmentName extends SegmentName implements Comparable {
_segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
}
+ /**
+ * Returns the sequence number of the given segment name.
+ */
+ public static int getSequenceNumber(String segmentName) {
+ return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, SEPARATOR)[2]);
+ }
+
@Override
public RealtimeSegmentType getSegmentType() {
return RealtimeSegmentType.LLC;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
index 24acbab..3912c2c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.slf4j.Logger;
@@ -32,7 +33,10 @@ import org.slf4j.LoggerFactory;
/**
* Manages the upsert metadata per partition.
- * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>For multiple records with the same timestamp, the manager will preserve the latest record based on the sequence
+ * number of the segment. If 2 records with the same timestamp are in the same segment, the one with larger doc id will
+ * be preserved. Note that for tables with sorted column, the records will be re-ordered when committing the segment,
+ * and we will use the re-ordered doc ids instead of the ingestion order to decide which record to preserve.
* <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
* operation is done:
* <ul>
@@ -83,34 +87,35 @@ public class PartitionUpsertMetadataManager {
if (segmentName.equals(currentRecordLocation.getSegmentName())) {
// The current record location has the same segment name
- if (validDocIds == currentRecordLocation.getValidDocIds()) {
- // The current record location is pointing to the new segment being loaded
-
- // Update the record location when getting a newer timestamp
- if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+ // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+ // There are 2 scenarios:
+ // 1. The current record location is pointing to the same segment (the segment being added). In this case,
+ // we want to update the record location when there is a tie to keep the newer record. Note that the
+ // record info iterator will return records with incremental doc ids.
+ // 2. The current record location is pointing to the old segment being replaced. This could happen when
+ // committing a consuming segment, or reloading a completed segment. In this case, we want to update
+ // the record location when there is a tie because the record locations should point to the new added
+ // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+ // segment because it has not been replaced yet.
+ if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
+ // Only update the valid doc ids for the new segment
+ if (validDocIds == currentRecordLocation.getValidDocIds()) {
validDocIds.remove(currentRecordLocation.getDocId());
- validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
}
+ validDocIds.add(recordInfo._docId);
+ return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
} else {
- // The current record location is pointing to the old segment being replaced. This could happen when
- // committing a consuming segment, or reloading a completed segment.
-
- // Update the record location when the new timestamp is greater than or equal to the current timestamp.
- // Update the record location when there is a tie because the record locations should point to the new
- // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
- // segment because it has not been replaced yet.
- if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
- validDocIds.add(recordInfo._docId);
- return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
- }
+ return currentRecordLocation;
}
- return currentRecordLocation;
} else {
// The current record location is pointing to a different segment
- // Update the record location when getting a newer timestamp
- if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+ // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
+ // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
+ if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+ recordInfo._timestamp == currentRecordLocation.getTimestamp()
+ && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
+ .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
@@ -139,8 +144,9 @@ public class PartitionUpsertMetadataManager {
if (currentRecordLocation != null) {
// Existing primary key
- // Update the record location when getting a newer timestamp
- if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+ // Update the record location when the new timestamp is greater than or equal to the current timestamp. Update
+ // the record location when there is a tie to keep the newer record.
+ if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
index d1cd679..a2bde99 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
@@ -22,9 +22,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo;
import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -35,17 +37,17 @@ import static org.testng.Assert.assertSame;
public class PartitionUpsertMetadataManagerTest {
- private static final String SEGMENT_PREFIX = "testSegment";
- private static final String REALTIME_TEST_TABLE = "testTable_REALTIME";
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@Test
public void testAddSegment() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
- String segment1 = SEGMENT_PREFIX + 1;
+ String segment1 = getSegmentName(1);
List<RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -55,14 +57,14 @@ public class PartitionUpsertMetadataManagerTest {
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100));
ThreadSafeMutableRoaringBitmap validDocIds1 =
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
- // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 4});
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
// Add the second segment
- String segment2 = SEGMENT_PREFIX + 2;
+ String segment2 = getSegmentName(2);
List<RecordInfo> recordInfoList2 = new ArrayList<>();
recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100));
@@ -71,45 +73,47 @@ public class PartitionUpsertMetadataManagerTest {
recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80));
ThreadSafeMutableRoaringBitmap validDocIds2 =
upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
- // segment1: 0 -> {0, 100}, 1 -> {4, 120}
- // segment2: 2 -> {2, 120}, 3 -> {3, 80}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
// Replace (reload) the first segment
ThreadSafeMutableRoaringBitmap newValidDocIds1 =
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
- // original segment1: 0 -> {0, 100}, 1 -> {4, 120}
- // segment2: 2 -> {2, 120}, 3 -> {3, 80}
- // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // original segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
- assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
- assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1);
// Remove the original segment1
upsertMetadataManager.removeSegment(segment1, validDocIds1);
- // segment2: 2 -> {2, 120}, 3 -> {3, 80}
- // new segment1: 0 -> {0, 100}, 1 -> {4, 120}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
- assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
- assertSame(recordLocationMap.get(getPrimaryKey(0)).getValidDocIds(), newValidDocIds1);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertSame(recordLocationMap.get(getPrimaryKey(1)).getValidDocIds(), newValidDocIds1);
}
+ private static String getSegmentName(int sequenceNumber) {
+ return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, System.currentTimeMillis()).toString();
+ }
+
private static PrimaryKey getPrimaryKey(int value) {
return new PrimaryKey(new Object[]{value});
}
@@ -126,12 +130,12 @@ public class PartitionUpsertMetadataManagerTest {
@Test
public void testUpdateRecord() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
- String segment1 = SEGMENT_PREFIX + 1;
+ String segment1 = getSegmentName(1);
List<RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 120));
@@ -140,7 +144,7 @@ public class PartitionUpsertMetadataManagerTest {
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
// Update records from the second segment
- String segment2 = SEGMENT_PREFIX + 2;
+ String segment2 = getSegmentName(2);
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(3), 0, 100), validDocIds2);
@@ -174,32 +178,32 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
upsertMetadataManager.updateRecord(segment2, new RecordInfo(getPrimaryKey(0), 3, 100), validDocIds2);
- // segment1: 0 -> {0, 100}, 1 -> {1, 120}
- // segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+ // segment1: 1 -> {1, 120}
+ // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
- assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
}
@Test
public void testRemoveSegment() {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TEST_TABLE, 0, Mockito.mock(ServerMetrics.class));
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add 2 segments
// segment1: 0 -> {0, 100}, 1 -> {1, 100}
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
- String segment1 = SEGMENT_PREFIX + 1;
+ String segment1 = getSegmentName(1);
List<RecordInfo> recordInfoList1 = new ArrayList<>();
recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
ThreadSafeMutableRoaringBitmap validDocIds1 =
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
- String segment2 = SEGMENT_PREFIX + 1;
+ String segment2 = getSegmentName(2);
List<RecordInfo> recordInfoList2 = new ArrayList<>();
recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 0, 100));
recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 1, 100));
@@ -211,8 +215,8 @@ public class PartitionUpsertMetadataManagerTest {
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
assertNull(recordLocationMap.get(getPrimaryKey(0)));
assertNull(recordLocationMap.get(getPrimaryKey(1)));
- checkRecordLocation(recordLocationMap, 2, segment1, 0, 100);
- checkRecordLocation(recordLocationMap, 3, segment1, 1, 100);
+ checkRecordLocation(recordLocationMap, 2, segment2, 0, 100);
+ checkRecordLocation(recordLocationMap, 3, segment2, 1, 100);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org