You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/02/09 18:29:46 UTC

[pinot] branch master updated: Avoid upsert metadata access after the manager is closed (#10251)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c57462d3b Avoid upsert metadata access after the manager is closed (#10251)
2c57462d3b is described below

commit 2c57462d3bd3e5b2e15283e45f263adb773f1dc8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 9 10:29:38 2023 -0800

    Avoid upsert metadata access after the manager is closed (#10251)
---
 .../core/data/manager/BaseTableDataManager.java    |   8 +
 .../realtime/LLRealtimeSegmentDataManager.java     |   5 +
 .../manager/realtime/RealtimeTableDataManager.java |  12 +-
 .../local/data/manager/TableDataManager.java       |   2 +
 .../upsert/BasePartitionUpsertMetadataManager.java | 161 ++++++++++++++++++---
 ...oncurrentMapPartitionUpsertMetadataManager.java |  10 +-
 .../ConcurrentMapTableUpsertMetadataManager.java   |  12 +-
 .../upsert/PartitionUpsertMetadataManager.java     |   5 +
 .../local/upsert/TableUpsertMetadataManager.java   |   5 +
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  43 +++++-
 10 files changed, 223 insertions(+), 40 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6ce2c3e451..7a3a32f903 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -107,6 +107,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
   // Cache used for identifying segments which could not be acquired since they were recently deleted.
   protected Cache<String, String> _recentlyDeletedSegments;
 
+  protected volatile boolean _shutDown;
+
   @Override
   public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
@@ -182,12 +184,18 @@ public abstract class BaseTableDataManager implements TableDataManager {
   @Override
   public void shutDown() {
     _logger.info("Shutting down table data manager for table: {}", _tableNameWithType);
+    _shutDown = true;
     doShutdown();
     _logger.info("Shut down table data manager for table: {}", _tableNameWithType);
   }
 
   protected abstract void doShutdown();
 
+  @Override
+  public boolean isShutDown() {
+    return _shutDown;
+  }
+
   /**
    * {@inheritDoc}
    * <p>If one segment already exists with the same name, replaces it with the new one.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6e43a77202..4d82f6953e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -873,6 +873,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
     closeStreamConsumers();
+    // Do not allow building segment when table data manager is already shut down
+    if (_realtimeTableDataManager.isShutDown()) {
+      _segmentLogger.warn("Table data manager is already shut down");
+      return null;
+    }
     try {
       final long startTimeMillis = now();
       if (_segBuildSemaphore != null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 9315770943..96adea8ee6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -246,14 +246,20 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   @Override
   protected void doShutdown() {
     if (_tableUpsertMetadataManager != null) {
+      // Stop the upsert metadata manager first to prevent removing metadata when destroying segments
+      _tableUpsertMetadataManager.stop();
+      for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) {
+        segmentDataManager.destroy();
+      }
       try {
         _tableUpsertMetadataManager.close();
       } catch (IOException e) {
         _logger.warn("Cannot close upsert metadata manager properly for table: {}", _tableNameWithType, e);
       }
-    }
-    for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) {
-      segmentDataManager.destroy();
+    } else {
+      for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) {
+        segmentDataManager.destroy();
+      }
     }
     if (_leaseExtender != null) {
       _leaseExtender.shutDown();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 121e47f6f6..ae53ce3383 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -62,6 +62,8 @@ public interface TableDataManager {
    */
   void shutDown();
 
+  boolean isShutDown();
+
   /**
    * Adds a loaded immutable segment into the table.
    */
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 6bc9e94067..361052fdc2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -37,8 +38,10 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +64,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   @VisibleForTesting
   public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
 
-  protected volatile boolean _closed = false;
+  protected volatile boolean _stopped = false;
+  // Initialize with 1 pending operation to indicate the metadata manager can take more operations
+  protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
 
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
   protected int _numOutOfOrderEvents = 0;
@@ -87,6 +92,19 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
   @Override
   public void addSegment(ImmutableSegment segment) {
+    if (_stopped) {
+      _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName());
+      return;
+    }
+    startOperation();
+    try {
+      doAddSegment(segment);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  protected void doAddSegment(ImmutableSegment segment) {
     String segmentName = segment.getSegmentName();
     _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
 
@@ -162,8 +180,38 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
       @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
 
+  @Override
+  public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
+    if (_stopped) {
+      _logger.debug("Skip adding record to segment: {} because metadata manager is already stopped",
+          segment.getSegmentName());
+      return;
+    }
+    startOperation();
+    try {
+      doAddRecord(segment, recordInfo);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  protected abstract void doAddRecord(MutableSegment segment, RecordInfo recordInfo);
+
   @Override
   public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
+    if (_stopped) {
+      _logger.info("Skip replacing segment: {} because metadata manager is already stopped", segment.getSegmentName());
+      return;
+    }
+    startOperation();
+    try {
+      doReplaceSegment(segment, oldSegment);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  protected void doReplaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
     String segmentName = segment.getSegmentName();
     Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
         "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
@@ -250,34 +298,43 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   @Override
   public void removeSegment(IndexSegment segment) {
     String segmentName = segment.getSegmentName();
-    _logger.info("Removing {} segment: {}, current primary key count: {}",
-        segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
-
     if (_replacedSegments.remove(segment)) {
       _logger.info("Skip removing replaced segment: {}", segmentName);
       return;
     }
+    // Allow persisting valid doc ids snapshot after metadata manager is stopped
+    MutableRoaringBitmap validDocIds =
+        segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+    if (_enableSnapshot && segment instanceof ImmutableSegmentImpl) {
+      ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
+    }
+    if (validDocIds == null || validDocIds.isEmpty()) {
+      _logger.info("Skip removing segment without valid docs: {}", segmentName);
+      return;
+    }
+    if (_stopped) {
+      _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
+      return;
+    }
+    startOperation();
+    try {
+      doRemoveSegment(segment);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  protected void doRemoveSegment(IndexSegment segment) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Removing {} segment: {}, current primary key count: {}",
+        segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
 
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
-      MutableRoaringBitmap validDocIds =
-          segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
-
-      if (_enableSnapshot && segment instanceof ImmutableSegmentImpl && validDocIds != null) {
-        ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
-      }
-
-      if (_closed) {
-        _logger.info("Skip removing segment: {} because metadata manager is already closed", segment);
-        return;
-      }
-
-      if (validDocIds == null || validDocIds.isEmpty()) {
-        _logger.info("Skip removing segment without valid docs: {}", segmentName);
-        return;
-      }
-
+      assert segment.getValidDocIds() != null;
+      MutableRoaringBitmap validDocIds = segment.getValidDocIds().getMutableRoaringBitmap();
+      assert validDocIds != null;
       _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
       removeSegment(segment, validDocIds);
     } finally {
@@ -292,6 +349,26 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
   }
 
+  @Override
+  public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
+    // Directly return the record when partial-upsert is not enabled
+    if (_partialUpsertHandler == null) {
+      return record;
+    }
+    if (_stopped) {
+      _logger.debug("Skip updating record because metadata manager is already stopped");
+      return record;
+    }
+    startOperation();
+    try {
+      return doUpdateRecord(record, recordInfo);
+    } finally {
+      finishOperation();
+    }
+  }
+
+  protected abstract GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo);
+
   protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) {
     _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
     _numOutOfOrderEvents++;
@@ -305,10 +382,48 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
+  protected void startOperation() {
+    _numPendingOperations.getAndIncrement();
+  }
+
+  protected void finishOperation() {
+    if (_numPendingOperations.decrementAndGet() == 0) {
+      synchronized (_numPendingOperations) {
+        _numPendingOperations.notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    _stopped = true;
+    int numPendingOperations = _numPendingOperations.decrementAndGet();
+    _logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}",
+        numPendingOperations, getNumPrimaryKeys());
+  }
+
   @Override
   public void close()
       throws IOException {
-    _logger.info("Closing the metadata manager, current primary key count: {}", getNumPrimaryKeys());
-    _closed = true;
+    Preconditions.checkState(_stopped, "Must stop the metadata manager before closing it");
+    _logger.info("Closing the metadata manager");
+    synchronized (_numPendingOperations) {
+      int numPendingOperations;
+      while ((numPendingOperations = _numPendingOperations.get()) != 0) {
+        _logger.info("Waiting for {} pending operations to finish", numPendingOperations);
+        try {
+          _numPendingOperations.wait();
+        } catch (InterruptedException e) {
+          throw new RuntimeException(
+              String.format("Interrupted while waiting for %d pending operations to finish", numPendingOperations), e);
+        }
+      }
+    }
+    doClose();
+    _logger.info("Closed the metadata manager");
+  }
+
+  protected void doClose()
+      throws IOException {
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index e11c1899f3..b1687afdf8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -182,7 +182,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   }
 
   @Override
-  public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
+  protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
     ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
     _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
         (primaryKey, currentRecordLocation) -> {
@@ -217,12 +217,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   }
 
   @Override
-  public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
-    // Directly return the record when partial-upsert is not enabled
-    if (_partialUpsertHandler == null) {
-      return record;
-    }
-
+  protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
+    assert _partialUpsertHandler != null;
     AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>();
     RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent(
         HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 3f830bb3a7..67d6fe773b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -39,12 +39,18 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
             _comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
   }
 
+  @Override
+  public void stop() {
+    for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : _partitionMetadataManagerMap.values()) {
+      metadataManager.stop();
+    }
+  }
+
   @Override
   public void close()
       throws IOException {
-    for (ConcurrentMapPartitionUpsertMetadataManager partitionUpsertMetadataManager
-        : _partitionMetadataManagerMap.values()) {
-      partitionUpsertMetadataManager.close();
+    for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : _partitionMetadataManagerMap.values()) {
+      metadataManager.close();
     }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index ef5ec7c414..dd07143fd3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -83,4 +83,9 @@ public interface PartitionUpsertMetadataManager extends Closeable {
    * Returns the merged record when partial-upsert is enabled.
    */
   GenericRow updateRecord(GenericRow record, RecordInfo recordInfo);
+
+  /**
+   * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
+   */
+  void stop();
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index de46879712..52007f16fe 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -38,4 +38,9 @@ public interface TableUpsertMetadataManager extends Closeable {
   PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
 
   UpsertConfig.Mode getUpsertMode();
+
+  /**
+   * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
+   */
+  void stop();
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index a0f18aaf14..6b4bf34368 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -60,7 +61,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
   @Test
-  public void testAddReplaceRemoveSegment() {
+  public void testAddReplaceRemoveSegment()
+      throws IOException {
     verifyAddReplaceRemoveSegment(HashFunction.NONE, false);
     verifyAddReplaceRemoveSegment(HashFunction.MD5, false);
     verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, false);
@@ -69,7 +71,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true);
   }
 
-  private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) {
+  private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
+      throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
@@ -196,6 +199,19 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Remove new segment1, should be no-op
+    upsertMetadataManager.removeSegment(newSegment1);
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
   private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
@@ -274,13 +290,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   @Test
-  public void testAddRecord() {
+  public void testAddRecord()
+      throws IOException {
     verifyAddRecord(HashFunction.NONE);
     verifyAddRecord(HashFunction.MD5);
     verifyAddRecord(HashFunction.MURMUR3);
   }
 
-  private void verifyAddRecord(HashFunction hashFunction) {
+  private void verifyAddRecord(HashFunction hashFunction)
+      throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
@@ -339,6 +357,23 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Add record should be no-op
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120)));
+    // segment1: 1 -> {1, 120}
+    // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org