You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/07/29 20:29:51 UTC

[pinot] branch master updated: Fix upsert replace (#9132)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 91289be930 Fix upsert replace (#9132)
91289be930 is described below

commit 91289be930754dee74776e804ce388d005b06e3b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 29 13:29:44 2022 -0700

    Fix upsert replace (#9132)
    
    - Fix upsert replace to
      - Correctly replace record location without removing the valid docs from the replaced segment
      - Correctly track and remove the remaining valid docs from the replaced segment
      - Track replaced segment so that the docs do not need to be removed again
    - Use segment lock to prevent adding/replacing/removing the same segment at same time which can cause race condition
    - Track an unexpected case where the primary keys show up in the wrong segment using meter `UPSERT_KEYS_IN_WRONG_SEGMENT`
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   3 +-
 .../manager/realtime/RealtimeTableDataManager.java |   7 +-
 .../upsert/PartitionUpsertMetadataManager.java     | 191 ++++++++++++++++-----
 .../pinot/segment/local/utils}/SegmentLocks.java   |   2 +-
 .../upsert/PartitionUpsertMetadataManagerTest.java |  93 ++++------
 .../starter/helix/HelixInstanceDataManager.java    |   1 +
 .../SegmentOnlineOfflineStateModelFactory.java     |   1 +
 7 files changed, 188 insertions(+), 110 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 22bb82abd4..c0d173fc76 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -41,8 +41,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
   REALTIME_PARTITION_MISMATCH("mismatch", false),
   REALTIME_DEDUP_DROPPED("rows", false),
+  UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false),
   PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
-  PARTIAL_UPSERT_ROWS_NOT_REPLACED("rows", false),
+  PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
   ROWS_WITH_ERRORS("rows", false),
   LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
   LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 654133af11..0a09ff4087 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -430,12 +430,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
       _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
     } else {
       IndexSegment oldSegment = oldSegmentManager.getSegment();
-      partitionUpsertMetadataManager.addSegment(immutableSegment);
-      // TODO: Fix the following issue about replacing segment in upsert metadata
-      //   - We cannot directly invalidate the docs in the replaced segment because query might still running against it
-      //   - We should track the valid docs in the replaced segment separately. Currently the docs won't be invalidate
-      //     in the replaced segment due to the reason above, and will cause wrong logs/metrics emitted.
-      // partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
+      partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
       _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
           oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType);
       releaseSegment(oldSegmentManager);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 39455c569e..d1042fe5c1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -23,9 +23,12 @@ import com.google.common.base.Preconditions;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
@@ -36,6 +39,7 @@ import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.local.utils.RecordInfo;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
@@ -92,6 +96,9 @@ public class PartitionUpsertMetadataManager {
   @VisibleForTesting
   final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
+  @VisibleForTesting
+  final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
+
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
 
@@ -122,6 +129,12 @@ public class PartitionUpsertMetadataManager {
    * Initializes the upsert metadata for the given immutable segment.
    */
   public void addSegment(ImmutableSegment segment) {
+    addSegment(segment, null, null);
+  }
+
+  @VisibleForTesting
+  void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable Iterator<RecordInfo> recordInfoIterator) {
     String segmentName = segment.getSegmentName();
     _logger.info("Adding segment: {}, current primary key count: {}", segmentName,
         _primaryKeyToRecordLocationMap.size());
@@ -131,10 +144,22 @@ public class PartitionUpsertMetadataManager {
       return;
     }
 
-    Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
-        "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
-        _tableNameWithType);
-    addSegment((ImmutableSegmentImpl) segment, new ThreadSafeMutableRoaringBitmap(), getRecordInfoIterator(segment));
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+          "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+          _tableNameWithType);
+      if (validDocIds == null) {
+        validDocIds = new ThreadSafeMutableRoaringBitmap();
+      }
+      if (recordInfoIterator == null) {
+        recordInfoIterator = getRecordInfoIterator(segment);
+      }
+      addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null);
+    } finally {
+      segmentLock.unlock();
+    }
 
     // Update metrics
     int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
@@ -180,11 +205,13 @@ public class PartitionUpsertMetadataManager {
     }
   }
 
-  @VisibleForTesting
-  void addSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
-      Iterator<RecordInfo> recordInfoIterator) {
+  private void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+      Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
+      @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
     segment.enableUpsert(this, validDocIds);
+
+    AtomicInteger numKeysInWrongSegment = new AtomicInteger();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
@@ -198,7 +225,7 @@ public class PartitionUpsertMetadataManager {
               // The current record is in the same segment
               // Update the record location when there is a tie to keep the newer record. Note that the record info
               // iterator will return records with incremental doc ids.
-              if (segment == currentSegment) {
+              if (currentSegment == segment) {
                 if (comparisonResult >= 0) {
                   validDocIds.replace(currentRecordLocation.getDocId(), recordInfo.getDocId());
                   return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
@@ -211,9 +238,25 @@ public class PartitionUpsertMetadataManager {
               // This could happen when committing a consuming segment, or reloading a completed segment. In this
               // case, we want to update the record location when there is a tie because the record locations should
               // point to the new added segment instead of the old segment being replaced. Also, do not update the valid
-              // doc ids for the old segment because it has not been replaced yet.
+              // doc ids for the old segment because it has not been replaced yet. We pass in an optional valid doc ids
+              // snapshot for the old segment, which can be updated and used to track the docs not replaced yet.
+              if (currentSegment == oldSegment) {
+                if (comparisonResult >= 0) {
+                  validDocIds.add(recordInfo.getDocId());
+                  if (validDocIdsForOldSegment != null) {
+                    validDocIdsForOldSegment.remove(currentRecordLocation.getDocId());
+                  }
+                  return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+                } else {
+                  return currentRecordLocation;
+                }
+              }
+
+              // This should not happen because the previously replaced segment should have all keys removed. We still
+              // handle it here, and also track the number of keys not properly replaced previously.
               String currentSegmentName = currentSegment.getSegmentName();
-              if (segmentName.equals(currentSegmentName)) {
+              if (currentSegmentName.equals(segmentName)) {
+                numKeysInWrongSegment.getAndIncrement();
                 if (comparisonResult >= 0) {
                   validDocIds.add(recordInfo.getDocId());
                   return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
@@ -243,6 +286,11 @@ public class PartitionUpsertMetadataManager {
             }
           });
     }
+    int numKeys = numKeysInWrongSegment.get();
+    if (numKeys > 0) {
+      _logger.warn("Found {} primary keys in the wrong segment when adding segment: {}", numKeys, segmentName);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys);
+    }
   }
 
   /**
@@ -285,36 +333,73 @@ public class PartitionUpsertMetadataManager {
   /**
    * Replaces the upsert metadata for the old segment with the new immutable segment.
    */
-  public void replaceSegment(ImmutableSegment newSegment, IndexSegment oldSegment) {
-    String segmentName = newSegment.getSegmentName();
+  public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
+    replaceSegment(segment, null, null, oldSegment);
+  }
+
+  @VisibleForTesting
+  void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
+    String segmentName = segment.getSegmentName();
     Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
         "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
         _tableNameWithType, oldSegment.getSegmentName(), segmentName);
-    _logger.info("Replacing {} segment: {}", oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
-        segmentName);
-
-    addSegment(newSegment);
-
-    MutableRoaringBitmap validDocIds =
-        oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
-    if (validDocIds != null && !validDocIds.isEmpty()) {
-      int numDocsNotReplaced = validDocIds.getCardinality();
-      if (_partialUpsertHandler != null) {
-        // For partial-upsert table, because we do not restore the original record location when removing the primary
-        // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a consuming
-        // segment is replaced by a committed segment that is consumed from a different server with different records
-        // (some stream consumer cannot guarantee consuming the messages in the same order).
-        _logger.error("{} primary keys not replaced when replacing segment: {} for partial-upsert table. This can "
-            + "potentially cause inconsistency between replicas", numDocsNotReplaced, segmentName);
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_ROWS_NOT_REPLACED,
-            numDocsNotReplaced);
+    _logger.info("Replacing {} segment: {}, current primary key count: {}",
+        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
+        _primaryKeyToRecordLocationMap.size());
+
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      MutableRoaringBitmap validDocIdsForOldSegment =
+          oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+      if (segment instanceof EmptyIndexSegment) {
+        _logger.info("Skip adding empty segment: {}", segmentName);
       } else {
-        _logger.info("{} primary keys not replaced when replacing segment: {}", numDocsNotReplaced, segmentName);
+        Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+            "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+            _tableNameWithType);
+        if (validDocIds == null) {
+          validDocIds = new ThreadSafeMutableRoaringBitmap();
+        }
+        if (recordInfoIterator == null) {
+          recordInfoIterator = getRecordInfoIterator(segment);
+        }
+        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
+            validDocIdsForOldSegment);
       }
-      removeSegment(oldSegment);
+
+      if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
+        int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+        if (_partialUpsertHandler != null) {
+          // For partial-upsert table, because we do not restore the original record location when removing the primary
+          // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
+          // consuming segment is replaced by a committed segment that is consumed from a different server with
+          // different records (some stream consumer cannot guarantee consuming the messages in the same order).
+          _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
+              + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
+          _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+              numKeysNotReplaced);
+        } else {
+          _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced,
+              segmentName);
+        }
+        removeSegment(oldSegment, validDocIdsForOldSegment);
+      }
+    } finally {
+      segmentLock.unlock();
+    }
+
+    if (!(oldSegment instanceof EmptyIndexSegment)) {
+      _replacedSegments.add(oldSegment);
     }
 
-    _logger.info("Finished replacing segment: {}", segmentName);
+    // Update metrics
+    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
+    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+        numPrimaryKeys);
+
+    _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
   }
 
   /**
@@ -326,14 +411,37 @@ public class PartitionUpsertMetadataManager {
         segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
         _primaryKeyToRecordLocationMap.size());
 
-    MutableRoaringBitmap validDocIds =
-        segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
-    if (validDocIds == null || validDocIds.isEmpty()) {
-      _logger.info("Skip removing segment without valid docs: {}", segmentName);
+    if (_replacedSegments.remove(segment)) {
+      _logger.info("Skip removing replaced segment: {}", segmentName);
       return;
     }
 
-    _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      MutableRoaringBitmap validDocIds =
+          segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+      if (validDocIds == null || validDocIds.isEmpty()) {
+        _logger.info("Skip removing segment without valid docs: {}", segmentName);
+        return;
+      }
+
+      _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
+      removeSegment(segment, validDocIds);
+    } finally {
+      segmentLock.unlock();
+    }
+
+    // Update metrics
+    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
+    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+        numPrimaryKeys);
+
+    _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+  }
+
+  private void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
+    assert !validDocIds.isEmpty();
     PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
     PeekableIntIterator iterator = validDocIds.getIntIterator();
     while (iterator.hasNext()) {
@@ -347,13 +455,6 @@ public class PartitionUpsertMetadataManager {
             return recordLocation;
           });
     }
-
-    // Update metrics
-    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
-    _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
   }
 
   /**
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
similarity index 96%
rename from pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java
rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
index 0a14d05a7b..abbd2c906c 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.segment.local.utils;
 
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index 639f6321e4..aa1392d6df 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -45,8 +45,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
 
 
 public class PartitionUpsertMetadataManagerTest {
@@ -54,13 +54,13 @@ public class PartitionUpsertMetadataManagerTest {
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
   @Test
-  public void testAddSegment() {
-    verifyAddSegment(HashFunction.NONE);
-    verifyAddSegment(HashFunction.MD5);
-    verifyAddSegment(HashFunction.MURMUR3);
+  public void testAddReplaceRemoveSegment() {
+    verifyAddReplaceRemoveSegment(HashFunction.NONE);
+    verifyAddReplaceRemoveSegment(HashFunction.MD5);
+    verifyAddReplaceRemoveSegment(HashFunction.MURMUR3);
   }
 
-  private void verifyAddSegment(HashFunction hashFunction) {
+  private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) {
     PartitionUpsertMetadataManager upsertMetadataManager =
         new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol",
             hashFunction, null, mock(ServerMetrics.class));
@@ -76,6 +76,7 @@ public class PartitionUpsertMetadataManagerTest {
     List<RecordInfo> recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
     upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
     checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
@@ -91,6 +92,7 @@ public class PartitionUpsertMetadataManagerTest {
         getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -99,9 +101,11 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
 
     // Add an empty segment
-    upsertMetadataManager.addSegment(new EmptyIndexSegment(mock(SegmentMetadataImpl.class)));
+    EmptyIndexSegment emptySegment = mockEmptySegment(3);
+    upsertMetadataManager.addSegment(emptySegment);
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -112,10 +116,11 @@ public class PartitionUpsertMetadataManagerTest {
     // Replace (reload) the first segment
     ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1);
-    upsertMetadataManager.addSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator());
-    // original segment1: 1 -> {4, 120}
+    upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1);
+    // original segment1: 1 -> {4, 120} (not in the map)
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -123,34 +128,42 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
-    assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(),
-        newSegment1);
+    assertEquals(upsertMetadataManager._replacedSegments, Collections.singleton(segment1));
 
     // Remove the original segment1
     upsertMetadataManager.removeSegment(segment1);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
-    assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(),
-        newSegment1);
+    assertTrue(upsertMetadataManager._replacedSegments.isEmpty());
 
-    // Remove an empty segment
-    upsertMetadataManager.removeSegment(new EmptyIndexSegment(mock(SegmentMetadataImpl.class)));
+    // Remove the empty segment
+    upsertMetadataManager.removeSegment(emptySegment);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
     checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
-    assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(),
-        newSegment1);
+
+    // Remove segment2
+    upsertMetadataManager.removeSegment(segment2);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
   }
 
   private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
@@ -179,6 +192,12 @@ public class PartitionUpsertMetadataManagerTest {
     return segment;
   }
 
+  private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
+    return new EmptyIndexSegment(segmentMetadata);
+  }
+
   private static MutableSegment mockMutableSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds) {
     MutableSegment segment = mock(MutableSegment.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
@@ -272,46 +291,6 @@ public class PartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
   }
 
-  @Test
-  public void testRemoveSegment() {
-    verifyRemoveSegment(HashFunction.NONE);
-    verifyRemoveSegment(HashFunction.MD5);
-    verifyRemoveSegment(HashFunction.MURMUR3);
-  }
-
-  private void verifyRemoveSegment(HashFunction hashFunction) {
-    PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol",
-            hashFunction, null, mock(ServerMetrics.class));
-    Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
-
-    // Add 2 segments
-    // segment1: 0 -> {0, 100}, 1 -> {1, 100}
-    // segment2: 2 -> {0, 100}, 3 -> {0, 100}
-    int numRecords = 2;
-    int[] primaryKeys = new int[]{0, 1};
-    int[] timestamps = new int[]{100, 100};
-    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
-    ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
-    upsertMetadataManager.addSegment(segment1, validDocIds1,
-        getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
-
-    primaryKeys = new int[]{2, 3};
-    ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
-    ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
-    upsertMetadataManager.addSegment(segment2, validDocIds2,
-        getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
-
-    // Remove the first segment
-    upsertMetadataManager.removeSegment(segment1);
-    // segment2: 2 -> {0, 100}, 3 -> {0, 100}
-    assertNull(recordLocationMap.get(makePrimaryKey(0)));
-    assertNull(recordLocationMap.get(makePrimaryKey(1)));
-    checkRecordLocation(recordLocationMap, 2, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 1, 100, hashFunction);
-    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
-  }
-
   @Test
   public void testHashPrimaryKey() {
     PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9904b95cb0..132fe44fe9 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -59,6 +59,7 @@ import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index d2e4c30ca6..9e541a29f2 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org