You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/10/21 05:44:23 UTC

[GitHub] [incubator-pinot] yupeng9 commented on a change in pull request #6167: Support reloading upsert table

yupeng9 commented on a change in pull request #6167:
URL: https://github.com/apache/incubator-pinot/pull/6167#discussion_r508991010



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed segment.
+
+              // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+              // Update the record location when there is a tie because the record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+              // segment because it has not been replaced yet.

Review comment:
       Not sure if this old segment check is necessary:
    - since the old segment will be replaced, it shall be safe to update the valid doc, since it will be gone anyway?
    - if so, then the handling is identical to the branch above, and therefore can be merged?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed segment.
+
+              // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+              // Update the record location when there is a tie because the record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+              // segment because it has not been replaced yet.
+              if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            }
+            return currentRecordLocation;
+          }
+
+          // Update the record location when getting a newer timestamp

Review comment:
       wrap this in the else branch for better readability.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new segment won't be reflected to the replaced

Review comment:
       it's worth explaining this a bit on which data structures won't be reflected.

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManagerTest.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.upsert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager.RecordInfo;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+
+
+public class PartitionUpsertMetadataManagerTest {
+  private static final String SEGMENT_PREFIX = "testSegment";
+
+  @Test
+  public void testAddSegment() {
+    PartitionUpsertMetadataManager upsertMetadataManager = new PartitionUpsertMetadataManager();
+    Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add the first segment
+    String segment1 = SEGMENT_PREFIX + 1;
+    List<RecordInfo> recordInfoList1 = new ArrayList<>();
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 0, 100));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 1, 100));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(2), 2, 100));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 3, 80));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(1), 4, 120));
+    recordInfoList1.add(new RecordInfo(getPrimaryKey(0), 5, 100));
+    ThreadSafeMutableRoaringBitmap validDocIds1 =
+        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+    // segment1: 0 -> {0, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 4});
+
+    // Add the second segment
+    String segment2 = SEGMENT_PREFIX + 2;
+    List<RecordInfo> recordInfoList2 = new ArrayList<>();
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 0, 100));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(1), 1, 100));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(2), 2, 120));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(3), 3, 80));
+    recordInfoList2.add(new RecordInfo(getPrimaryKey(0), 4, 80));
+    ThreadSafeMutableRoaringBitmap validDocIds2 =
+        upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
+    // segment1: 0 -> {0, 100}, 1 -> {4, 120}
+    // segment2: 2 -> {2, 120}, 3 -> {3, 80}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2, 3});
+
+    // Replace (reload) the first segment
+    ThreadSafeMutableRoaringBitmap newValidDocIds1 =
+        upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
+    // original segment1: 0 -> {0, 100}, 1 -> {4, 120}

Review comment:
       shall we include the removal as part of the replace? the removal of the old shall be after the addition of the new?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/RecordLocation.java
##########
@@ -28,13 +28,13 @@
   private final String _segmentName;
   private final int _docId;
   private final long _timestamp;
-  private final boolean _isConsuming;
+  private final ThreadSafeMutableRoaringBitmap _validDocIds;
 
-  public RecordLocation(String segmentName, int docId, long timestamp, boolean isConsuming) {
+  public RecordLocation(String segmentName, int docId, long timestamp, ThreadSafeMutableRoaringBitmap validDocIds) {

Review comment:
       not sure if `ThreadSafeMutableRoaringBitmap ` is the best identifier of the containing segment. Perhaps the segmentImpl itself, in case `ThreadSafeMutableRoaringBitmap ` itself may be replaced?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
##########
@@ -50,25 +51,29 @@
   private final SegmentMetadataImpl _segmentMetadata;
   private final Map<String, ColumnIndexContainer> _indexContainerMap;
   private final StarTreeIndexContainer _starTreeIndexContainer;
-  private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
-  private final ValidDocIndexReader _validDocIndex;
+
+  // For upsert
+  private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private ThreadSafeMutableRoaringBitmap _validDocIds;
+  private ValidDocIndexReader _validDocIndex;
 
   public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, SegmentMetadataImpl segmentMetadata,
       Map<String, ColumnIndexContainer> columnIndexContainerMap,
-      @Nullable StarTreeIndexContainer starTreeIndexContainer,
-      @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+      @Nullable StarTreeIndexContainer starTreeIndexContainer) {
     _segmentDirectory = segmentDirectory;
     _segmentMetadata = segmentMetadata;
     _indexContainerMap = columnIndexContainerMap;
     _starTreeIndexContainer = starTreeIndexContainer;
-    if (partitionUpsertMetadataManager != null) {
-      _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
-      _validDocIndex =
-          new ValidDocIndexReaderImpl(partitionUpsertMetadataManager.createValidDocIds(getSegmentName()));
-    } else {
-      _partitionUpsertMetadataManager = null;
-      _validDocIndex = null;
-    }
+  }
+
+  /**
+   * Enables upsert for this segment.
+   */
+  public void enableUpsert(PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
+    _validDocIds = validDocIds;
+    _validDocIndex = new ValidDocIndexReaderImpl(validDocIds);

Review comment:
       Is it possible that this immutable segment is queried before the `enableUpsert ` is invoked?
   
   If so, `_validDocIndex ` will be null and confuse the query plan

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed segment.
+
+              // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+              // Update the record location when there is a tie because the record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+              // segment because it has not been replaced yet.
+              if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            }
+            return currentRecordLocation;
+          }
+
+          // Update the record location when getting a newer timestamp
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+          } else {
+            return currentRecordLocation;
+          }
+        } else {
+          // New primary key
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+        }
+      });
     }
     return validDocIds;
   }
 
   /**
-   * Returns the valid doc ids for the given (immutable) segment.
-   */
-  public ThreadSafeMutableRoaringBitmap getValidDocIds(String segmentName) {
-    return Preconditions
-        .checkNotNull(_segmentToValidDocIdsMap.get(segmentName), "Failed to find valid doc ids for segment: %s",
-            segmentName);
-  }
-
-  /**
-   * Updates the record location of the given primary key if the given record location is newer than the current record
-   * location. Also updates the valid doc ids accordingly if the record location is updated.
+   * Updates the upsert metadata for a new consumed record in the given consuming segment.
    */
-  public void updateRecordLocation(PrimaryKey primaryKey, RecordLocation recordLocation,
+  public synchronized void updateRecord(String segmentName, RecordInfo recordInfo,
       ThreadSafeMutableRoaringBitmap validDocIds) {
-    _primaryKeyToRecordLocationMap.compute(primaryKey, (k, v) -> {
-      if (v != null) {
+    _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
+      if (currentRecordLocation != null) {
         // Existing primary key
 
-        if (recordLocation.getTimestamp() >= v.getTimestamp()) {
-          // Update the record location
-          // NOTE: Update the record location when there is a tie on the timestamp because during the segment
-          //       commitment, when loading the committed segment, it should replace the old record locations in case
-          //       the order of records changed.
-
-          // Remove the doc from the valid doc ids of the previous location
-          if (v.isConsuming()) {
-            // Previous location is a consuming segment, whose valid doc ids are maintained locally. Only update the
-            // valid doc ids when the update is from the same segment.
-            if (recordLocation.isConsuming() && recordLocation.getSegmentName().equals(v.getSegmentName())) {
-              validDocIds.remove(v.getDocId());
-            }
-          } else {
-            ThreadSafeMutableRoaringBitmap validDocIdsForPreviousLocation =
-                _segmentToValidDocIdsMap.get(v.getSegmentName());
-            if (validDocIdsForPreviousLocation != null) {
-              validDocIdsForPreviousLocation.remove(v.getDocId());
-            } else {
-              LOGGER.warn("Failed to find valid doc ids for previous location: {}", v.getSegmentName());
-            }
-          }
-
-          validDocIds.add(recordLocation.getDocId());
-          return recordLocation;
+        // Update the record location when getting a newer timestamp
+        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+          currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
         } else {
-          // No need to update
-          return v;
+          return currentRecordLocation;
         }
       } else {
         // New primary key
-        validDocIds.add(recordLocation.getDocId());
-        return recordLocation;
+        validDocIds.add(recordInfo._docId);
+        return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
       }
     });
   }
 
   /**
-   * Removes the upsert metadata for the given segment.
+   * Removes the upsert metadata for the given immutable segment. No need to remove the upsert metadata for the
+   * consuming segment because it should be replaced by the committed segment.
    */
-  public void removeSegment(String segmentName) {
+  public synchronized void removeSegment(String segmentName, ThreadSafeMutableRoaringBitmap validDocIds) {
     LOGGER.info("Removing upsert metadata for segment: {}", segmentName);
-    _primaryKeyToRecordLocationMap.forEach((k, v) -> {
-      if (v.getSegmentName().equals(segmentName)) {
-        // NOTE: Check and remove to prevent removing the key that is just updated.
-        _primaryKeyToRecordLocationMap.remove(k, v);
-      }
-    });
-    _segmentToValidDocIdsMap.remove(segmentName);
+
+    if (!validDocIds.getMutableRoaringBitmap().isEmpty()) {

Review comment:
       does this check the case that a replaced segment shall not remove the keys of the newly loaded? Perhaps we shall consider a state of tracking the current segmentImpl (and its corresponding `validDocIds`) for a segment name?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed segment.
+
+              // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+              // Update the record location when there is a tie because the record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+              // segment because it has not been replaced yet.
+              if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            }
+            return currentRecordLocation;
+          }
+
+          // Update the record location when getting a newer timestamp
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+            currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+          } else {
+            return currentRecordLocation;
+          }
+        } else {
+          // New primary key
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+        }
+      });
     }
     return validDocIds;
   }
 
   /**
-   * Returns the valid doc ids for the given (immutable) segment.
-   */
-  public ThreadSafeMutableRoaringBitmap getValidDocIds(String segmentName) {
-    return Preconditions
-        .checkNotNull(_segmentToValidDocIdsMap.get(segmentName), "Failed to find valid doc ids for segment: %s",
-            segmentName);
-  }
-
-  /**
-   * Updates the record location of the given primary key if the given record location is newer than the current record
-   * location. Also updates the valid doc ids accordingly if the record location is updated.
+   * Updates the upsert metadata for a new consumed record in the given consuming segment.
    */
-  public void updateRecordLocation(PrimaryKey primaryKey, RecordLocation recordLocation,
+  public synchronized void updateRecord(String segmentName, RecordInfo recordInfo,
       ThreadSafeMutableRoaringBitmap validDocIds) {
-    _primaryKeyToRecordLocationMap.compute(primaryKey, (k, v) -> {
-      if (v != null) {
+    _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
+      if (currentRecordLocation != null) {
         // Existing primary key
 
-        if (recordLocation.getTimestamp() >= v.getTimestamp()) {
-          // Update the record location
-          // NOTE: Update the record location when there is a tie on the timestamp because during the segment
-          //       commitment, when loading the committed segment, it should replace the old record locations in case
-          //       the order of records changed.
-
-          // Remove the doc from the valid doc ids of the previous location
-          if (v.isConsuming()) {
-            // Previous location is a consuming segment, whose valid doc ids are maintained locally. Only update the
-            // valid doc ids when the update is from the same segment.
-            if (recordLocation.isConsuming() && recordLocation.getSegmentName().equals(v.getSegmentName())) {
-              validDocIds.remove(v.getDocId());
-            }
-          } else {
-            ThreadSafeMutableRoaringBitmap validDocIdsForPreviousLocation =
-                _segmentToValidDocIdsMap.get(v.getSegmentName());
-            if (validDocIdsForPreviousLocation != null) {
-              validDocIdsForPreviousLocation.remove(v.getDocId());
-            } else {
-              LOGGER.warn("Failed to find valid doc ids for previous location: {}", v.getSegmentName());
-            }
-          }
-
-          validDocIds.add(recordLocation.getDocId());
-          return recordLocation;
+        // Update the record location when getting a newer timestamp
+        if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+          currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          validDocIds.add(recordInfo._docId);
+          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
         } else {
-          // No need to update
-          return v;
+          return currentRecordLocation;
         }
       } else {
         // New primary key
-        validDocIds.add(recordLocation.getDocId());
-        return recordLocation;
+        validDocIds.add(recordInfo._docId);
+        return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
       }
     });
   }
 
   /**
-   * Removes the upsert metadata for the given segment.
+   * Removes the upsert metadata for the given immutable segment. No need to remove the upsert metadata for the
+   * consuming segment because it should be replaced by the committed segment.

Review comment:
       how is consuming segment related?

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
##########
@@ -117,8 +119,9 @@ public void buildSegment()
   public void loadSegment()
       throws Exception {
     _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
-    _upsertIndexSegment = ImmutableSegmentLoader
-        .load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap, new PartitionUpsertMetadataManager());
+    _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
+    ((ImmutableSegmentImpl) _upsertIndexSegment)
+        .enableUpsert(new PartitionUpsertMetadataManager(), new ThreadSafeMutableRoaringBitmap());

Review comment:
       nit: I think it's preferred to enable it as early as possible (i.e in the constructor), we know this segment will be an upsert one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org