You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/06 23:06:56 UTC
[pinot] branch master updated: When upserting new record, index the record before updating the upsert metadata (#7860)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 22b8969 When upserting new record, index the record before updating the upsert metadata (#7860)
22b8969 is described below
commit 22b8969d62eccbb4f028bfb5a847da0718584ea2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Dec 6 15:06:33 2021 -0800
When upserting new record, index the record before updating the upsert metadata (#7860)
---
.../indexsegment/mutable/MutableSegmentImpl.java | 17 ++---
.../upsert/PartitionUpsertMetadataManager.java | 77 ++++++++++++----------
.../upsert/PartitionUpsertMetadataManagerTest.java | 24 +++----
3 files changed, 62 insertions(+), 56 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 31c65da..5247462 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -472,12 +472,14 @@ public class MutableSegmentImpl implements MutableSegment {
throws IOException {
boolean canTakeMore;
if (isUpsertEnabled()) {
- row = handleUpsert(row, _numDocsIndexed);
-
- updateDictionary(row);
- addNewRow(row);
- // Update number of documents indexed at last to make the latest row queryable
+ PartitionUpsertMetadataManager.RecordInfo recordInfo = getRecordInfo(row, _numDocsIndexed);
+ GenericRow updatedRow = _partitionUpsertMetadataManager.updateRecord(row, recordInfo);
+ updateDictionary(updatedRow);
+ addNewRow(updatedRow);
+ // Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
+ // once validated
canTakeMore = _numDocsIndexed++ < _capacity;
+ _partitionUpsertMetadataManager.addRecord(this, recordInfo);
} else {
// Update dictionary first
updateDictionary(row);
@@ -511,13 +513,12 @@ public class MutableSegmentImpl implements MutableSegment {
return _upsertMode != UpsertConfig.Mode.NONE;
}
- private GenericRow handleUpsert(GenericRow row, int docId) {
+ private PartitionUpsertMetadataManager.RecordInfo getRecordInfo(GenericRow row, int docId) {
PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
Preconditions.checkState(upsertComparisonValue instanceof Comparable,
"Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
- return _partitionUpsertMetadataManager.updateRecord(this,
- new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue), row);
+ return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
}
private void updateDictionary(GenericRow row) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 351d36a..82df26e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -79,8 +79,6 @@ public class PartitionUpsertMetadataManager {
// Reused for reading previous record during partial upsert
private final GenericRow _reuse = new GenericRow();
- // Stores the result of updateRecord()
- private GenericRow _result;
public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics,
@Nullable PartialUpsertHandler partialUpsertHandler, UpsertConfig.HashFunction hashFunction) {
@@ -163,26 +161,10 @@ public class PartitionUpsertMetadataManager {
}
/**
- * Updates the upsert metadata for a new consumed record in the given consuming segment. Returns the merged record if
- * partial-upsert is enabled.
+ * Updates the upsert metadata for a new consumed record in the given consuming segment.
*/
- public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, GenericRow record) {
- // For partial-upsert, need to ensure all previous records are loaded before inserting new records.
- if (_partialUpsertHandler != null) {
- while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
- LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}",
- _tableNameWithType);
- try {
- //noinspection BusyWait
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
+ public void addRecord(IndexSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
- _result = record;
_primaryKeyToRecordLocationMap.compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
@@ -193,12 +175,6 @@ public class PartitionUpsertMetadataManager {
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
IndexSegment currentSegment = currentRecordLocation.getSegment();
int currentDocId = currentRecordLocation.getDocId();
- if (_partialUpsertHandler != null) {
- // Partial upsert
- _reuse.clear();
- GenericRow previousRecord = currentSegment.getRecord(currentDocId, _reuse);
- _result = _partialUpsertHandler.merge(previousRecord, record);
- }
if (segment == currentSegment) {
validDocIds.replace(currentDocId, recordInfo._docId);
} else {
@@ -207,12 +183,6 @@ public class PartitionUpsertMetadataManager {
}
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
} else {
- if (_partialUpsertHandler != null) {
- LOGGER.warn(
- "Got late event for partial upsert: {} (current comparison value: {}, record comparison value: "
- + "{}), skipping updating the" + " record", record, currentRecordLocation.getComparisonValue(),
- recordInfo._comparisonValue);
- }
return currentRecordLocation;
}
} else {
@@ -224,7 +194,48 @@ public class PartitionUpsertMetadataManager {
// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
- return _result;
+ }
+
+ /**
+ * Returns the merged record when partial-upsert is enabled.
+ */
+ public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
+ // Directly return the record when partial-upsert is not enabled
+ if (_partialUpsertHandler == null) {
+ return record;
+ }
+
+ // Ensure all previous records are loaded before inserting new records
+ while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
+ LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
+ try {
+ //noinspection BusyWait
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ RecordLocation currentRecordLocation =
+ _primaryKeyToRecordLocationMap.get(hashPrimaryKey(recordInfo._primaryKey, _hashFunction));
+ if (currentRecordLocation != null) {
+ // Existing primary key
+ if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+ _reuse.clear();
+ GenericRow previousRecord =
+ currentRecordLocation.getSegment().getRecord(currentRecordLocation.getDocId(), _reuse);
+ return _partialUpsertHandler.merge(previousRecord, record);
+ } else {
+ LOGGER.warn(
+ "Got late event for partial-upsert: {} (current comparison value: {}, record comparison value: {}), "
+ + "skipping updating the record", record, currentRecordLocation.getComparisonValue(),
+ recordInfo._comparisonValue);
+ return record;
+ }
+ } else {
+ // New primary key
+ return record;
+ }
}
/**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index b6aabac..960fc2a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -27,7 +27,6 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
@@ -153,13 +152,13 @@ public class PartitionUpsertMetadataManagerTest {
}
@Test
- public void testUpdateRecord() {
- verifyUpdateRecord(UpsertConfig.HashFunction.NONE);
- verifyUpdateRecord(UpsertConfig.HashFunction.MD5);
- verifyUpdateRecord(UpsertConfig.HashFunction.MURMUR3);
+ public void testAddRecord() {
+ verifyAddRecord(UpsertConfig.HashFunction.NONE);
+ verifyAddRecord(UpsertConfig.HashFunction.MD5);
+ verifyAddRecord(UpsertConfig.HashFunction.MURMUR3);
}
- private void verifyUpdateRecord(UpsertConfig.HashFunction hashFunction) {
+ private void verifyAddRecord(UpsertConfig.HashFunction hashFunction) {
PartitionUpsertMetadataManager upsertMetadataManager =
new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null, hashFunction);
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -178,9 +177,7 @@ public class PartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
IndexSegment segment2 = mockSegment(1, validDocIds2);
- GenericRow row = mock(GenericRow.class);
- upsertMetadataManager
- .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row);
+ upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -190,8 +187,7 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
- upsertMetadataManager
- .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row);
+ upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -201,8 +197,7 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- upsertMetadataManager
- .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row);
+ upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -212,8 +207,7 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- upsertMetadataManager
- .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row);
+ upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org