You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/06 23:06:56 UTC

[pinot] branch master updated: When upserting new record, index the record before updating the upsert metadata (#7860)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 22b8969  When upserting new record, index the record before updating the upsert metadata (#7860)
22b8969 is described below

commit 22b8969d62eccbb4f028bfb5a847da0718584ea2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Dec 6 15:06:33 2021 -0800

    When upserting new record, index the record before updating the upsert metadata (#7860)
---
 .../indexsegment/mutable/MutableSegmentImpl.java   | 17 ++---
 .../upsert/PartitionUpsertMetadataManager.java     | 77 ++++++++++++----------
 .../upsert/PartitionUpsertMetadataManagerTest.java | 24 +++----
 3 files changed, 62 insertions(+), 56 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 31c65da..5247462 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -472,12 +472,14 @@ public class MutableSegmentImpl implements MutableSegment {
       throws IOException {
     boolean canTakeMore;
     if (isUpsertEnabled()) {
-      row = handleUpsert(row, _numDocsIndexed);
-
-      updateDictionary(row);
-      addNewRow(row);
-      // Update number of documents indexed at last to make the latest row queryable
+      PartitionUpsertMetadataManager.RecordInfo recordInfo = getRecordInfo(row, _numDocsIndexed);
+      GenericRow updatedRow = _partitionUpsertMetadataManager.updateRecord(row, recordInfo);
+      updateDictionary(updatedRow);
+      addNewRow(updatedRow);
+      // Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
+      // once validated
       canTakeMore = _numDocsIndexed++ < _capacity;
+      _partitionUpsertMetadataManager.addRecord(this, recordInfo);
     } else {
       // Update dictionary first
       updateDictionary(row);
@@ -511,13 +513,12 @@ public class MutableSegmentImpl implements MutableSegment {
     return _upsertMode != UpsertConfig.Mode.NONE;
   }
 
-  private GenericRow handleUpsert(GenericRow row, int docId) {
+  private PartitionUpsertMetadataManager.RecordInfo getRecordInfo(GenericRow row, int docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
     Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
     Preconditions.checkState(upsertComparisonValue instanceof Comparable,
         "Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
-    return _partitionUpsertMetadataManager.updateRecord(this,
-        new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue), row);
+    return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
   }
 
   private void updateDictionary(GenericRow row) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 351d36a..82df26e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -79,8 +79,6 @@ public class PartitionUpsertMetadataManager {
 
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
-  // Stores the result of updateRecord()
-  private GenericRow _result;
 
   public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics,
       @Nullable PartialUpsertHandler partialUpsertHandler, UpsertConfig.HashFunction hashFunction) {
@@ -163,26 +161,10 @@ public class PartitionUpsertMetadataManager {
   }
 
   /**
-   * Updates the upsert metadata for a new consumed record in the given consuming segment. Returns the merged record if
-   * partial-upsert is enabled.
+   * Updates the upsert metadata for a new consumed record in the given consuming segment.
    */
-  public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, GenericRow record) {
-    // For partial-upsert, need to ensure all previous records are loaded before inserting new records.
-    if (_partialUpsertHandler != null) {
-      while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
-        LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}",
-            _tableNameWithType);
-        try {
-          //noinspection BusyWait
-          Thread.sleep(1000L);
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-
+  public void addRecord(IndexSegment segment, RecordInfo recordInfo) {
     ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
-    _result = record;
     _primaryKeyToRecordLocationMap.compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction),
         (primaryKey, currentRecordLocation) -> {
           if (currentRecordLocation != null) {
@@ -193,12 +175,6 @@ public class PartitionUpsertMetadataManager {
             if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
               IndexSegment currentSegment = currentRecordLocation.getSegment();
               int currentDocId = currentRecordLocation.getDocId();
-              if (_partialUpsertHandler != null) {
-                // Partial upsert
-                _reuse.clear();
-                GenericRow previousRecord = currentSegment.getRecord(currentDocId, _reuse);
-                _result = _partialUpsertHandler.merge(previousRecord, record);
-              }
               if (segment == currentSegment) {
                 validDocIds.replace(currentDocId, recordInfo._docId);
               } else {
@@ -207,12 +183,6 @@ public class PartitionUpsertMetadataManager {
               }
               return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
             } else {
-              if (_partialUpsertHandler != null) {
-                LOGGER.warn(
-                    "Got late event for partial upsert: {} (current comparison value: {}, record comparison value: "
-                        + "{}), skipping updating the" + " record", record, currentRecordLocation.getComparisonValue(),
-                    recordInfo._comparisonValue);
-              }
               return currentRecordLocation;
             }
           } else {
@@ -224,7 +194,48 @@ public class PartitionUpsertMetadataManager {
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
-    return _result;
+  }
+
+  /**
+   * Returns the merged record when partial-upsert is enabled.
+   */
+  public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
+    // Directly return the record when partial-upsert is not enabled
+    if (_partialUpsertHandler == null) {
+      return record;
+    }
+
+    // Ensure all previous records are loaded before inserting new records
+    while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
+      LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
+      try {
+        //noinspection BusyWait
+        Thread.sleep(1000L);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    RecordLocation currentRecordLocation =
+        _primaryKeyToRecordLocationMap.get(hashPrimaryKey(recordInfo._primaryKey, _hashFunction));
+    if (currentRecordLocation != null) {
+      // Existing primary key
+      if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+        _reuse.clear();
+        GenericRow previousRecord =
+            currentRecordLocation.getSegment().getRecord(currentRecordLocation.getDocId(), _reuse);
+        return _partialUpsertHandler.merge(previousRecord, record);
+      } else {
+        LOGGER.warn(
+            "Got late event for partial-upsert: {} (current comparison value: {}, record comparison value: {}), "
+                + "skipping updating the record", record, currentRecordLocation.getComparisonValue(),
+            recordInfo._comparisonValue);
+        return record;
+      }
+    } else {
+      // New primary key
+      return record;
+    }
   }
 
   /**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index b6aabac..960fc2a 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -27,7 +27,6 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
@@ -153,13 +152,13 @@ public class PartitionUpsertMetadataManagerTest {
   }
 
   @Test
-  public void testUpdateRecord() {
-    verifyUpdateRecord(UpsertConfig.HashFunction.NONE);
-    verifyUpdateRecord(UpsertConfig.HashFunction.MD5);
-    verifyUpdateRecord(UpsertConfig.HashFunction.MURMUR3);
+  public void testAddRecord() {
+    verifyAddRecord(UpsertConfig.HashFunction.NONE);
+    verifyAddRecord(UpsertConfig.HashFunction.MD5);
+    verifyAddRecord(UpsertConfig.HashFunction.MURMUR3);
   }
 
-  private void verifyUpdateRecord(UpsertConfig.HashFunction hashFunction) {
+  private void verifyAddRecord(UpsertConfig.HashFunction hashFunction) {
     PartitionUpsertMetadataManager upsertMetadataManager =
         new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null, hashFunction);
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -178,9 +177,7 @@ public class PartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     IndexSegment segment2 = mockSegment(1, validDocIds2);
 
-    GenericRow row = mock(GenericRow.class);
-    upsertMetadataManager
-        .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row);
+    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100));
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -190,8 +187,7 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
 
-    upsertMetadataManager
-        .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row);
+    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120));
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -201,8 +197,7 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
-    upsertMetadataManager
-        .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row);
+    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100));
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -212,8 +207,7 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
-    upsertMetadataManager
-        .updateRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row);
+    upsertMetadataManager.addRecord(segment2, new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100));
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org