You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/12 00:39:46 UTC

(pinot) branch master updated: Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 92e26a5847 Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120)
92e26a5847 is described below

commit 92e26a584723e02927fb25819c07174f2feae594
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Dec 11 16:39:40 2023 -0800

    Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120)
---
 .../manager/realtime/RealtimeTableDataManager.java |   3 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |  18 +-
 .../upsert/BasePartitionUpsertMetadataManager.java |  31 ++--
 .../upsert/BaseTableUpsertMetadataManager.java     |  80 ++++-----
 ...oncurrentMapPartitionUpsertMetadataManager.java |  20 +--
 .../ConcurrentMapTableUpsertMetadataManager.java   |   4 +-
 .../local/upsert/TableUpsertMetadataManager.java   |   6 +-
 .../pinot/segment/local/upsert/UpsertContext.java  | 197 +++++++++++++++++++++
 .../MutableSegmentImplUpsertComparisonColTest.java |  37 ++--
 .../mutable/MutableSegmentImplUpsertTest.java      |  26 ++-
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 147 +++++++--------
 ...oncurrentMapTableUpsertMetadataManagerTest.java |  27 +--
 12 files changed, 395 insertions(+), 201 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index ffe0d47b46..8bf595d37a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,8 +207,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
       // NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to
       //       load segments into it
       _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig);
-      _tableUpsertMetadataManager.init(tableConfig, schema, this, _serverMetrics, _helixManager,
-          _segmentPreloadExecutor);
+      _tableUpsertMetadataManager.init(tableConfig, schema, this, _helixManager, _segmentPreloadExecutor);
     }
 
     // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index a73f9c471f..cf3e33f938 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -38,12 +38,12 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.UpsertContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -54,7 +54,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeClass;
@@ -62,6 +61,7 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
@@ -126,13 +126,17 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
   @BeforeClass
   public void loadSegment()
       throws Exception {
+    ServerMetrics.register(mock(ServerMetrics.class));
     _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
-    ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
-    ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
-        new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null,
-            false, 0, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
+    UpsertContext upsertContext =
+        new UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+            .setPrimaryKeyColumns(Collections.singletonList("column6"))
+            .setComparisonColumns(Collections.singletonList("daysSinceEpoch")).setTableIndexDir(INDEX_DIR).build();
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, upsertContext);
+    ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(upsertMetadataManager,
+        new ThreadSafeMutableRoaringBitmap(), null);
   }
 
   @AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 9184da1587..b7f9696b11 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -65,6 +65,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
   protected final String _tableNameWithType;
   protected final int _partitionId;
+  protected final UpsertContext _context;
   protected final List<String> _primaryKeyColumns;
   protected final List<String> _comparisonColumns;
   protected final String _deleteRecordColumn;
@@ -97,25 +98,23 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   private int _numPendingOperations = 1;
   private boolean _closed;
 
-  protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
-      List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
-      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+  protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
-    _primaryKeyColumns = primaryKeyColumns;
-    _comparisonColumns = comparisonColumns;
-    _deleteRecordColumn = deleteRecordColumn;
-    _hashFunction = hashFunction;
-    _partialUpsertHandler = partialUpsertHandler;
-    _enableSnapshot = enableSnapshot;
-    _metadataTTL = metadataTTL;
-    _deletedKeysTTL = deletedKeysTTL;
-    _tableIndexDir = tableIndexDir;
-    _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
-    _serverMetrics = serverMetrics;
+    _context = context;
+    _primaryKeyColumns = context.getPrimaryKeyColumns();
+    _comparisonColumns = context.getComparisonColumns();
+    _deleteRecordColumn = context.getDeleteRecordColumn();
+    _hashFunction = context.getHashFunction();
+    _partialUpsertHandler = context.getPartialUpsertHandler();
+    _enableSnapshot = context.isSnapshotEnabled();
+    _snapshotLock = _enableSnapshot ? new ReentrantReadWriteLock() : null;
+    _metadataTTL = context.getMetadataTTL();
+    _deletedKeysTTL = context.getDeletedKeysTTL();
+    _tableIndexDir = context.getTableIndexDir();
+    _serverMetrics = ServerMetrics.get();
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
-    if (metadataTTL > 0) {
+    if (_metadataTTL > 0) {
       _largestSeenComparisonValue = loadWatermark();
     } else {
       _largestSeenComparisonValue = Double.MIN_VALUE;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 6c16b4e7f5..e20b264c70 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -36,7 +36,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -56,76 +55,67 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableUpsertMetadataManager.class);
 
-  protected TableConfig _tableConfig;
-  protected Schema _schema;
-  protected TableDataManager _tableDataManager;
   protected String _tableNameWithType;
-  protected List<String> _primaryKeyColumns;
-  protected List<String> _comparisonColumns;
-  protected String _deleteRecordColumn;
-  protected HashFunction _hashFunction;
-  protected PartialUpsertHandler _partialUpsertHandler;
-  protected boolean _enableSnapshot;
-  protected double _metadataTTL;
-  protected double _deletedKeysTTL;
-  protected File _tableIndexDir;
-  protected ServerMetrics _serverMetrics;
+  protected TableDataManager _tableDataManager;
   protected HelixManager _helixManager;
   protected ExecutorService _segmentPreloadExecutor;
+  protected UpsertContext _context;
 
   private volatile boolean _isPreloading = false;
 
   @Override
-  public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager,
-      ServerMetrics serverMetrics, HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor) {
-    _tableConfig = tableConfig;
-    _schema = schema;
-    _tableDataManager = tableDataManager;
+  public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager,
+      @Nullable ExecutorService segmentPreloadExecutor) {
     _tableNameWithType = tableConfig.getTableName();
+    _tableDataManager = tableDataManager;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
 
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     Preconditions.checkArgument(upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE,
         "Upsert must be enabled for table: %s", _tableNameWithType);
 
-    _primaryKeyColumns = schema.getPrimaryKeyColumns();
-    Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
+    List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
+    Preconditions.checkArgument(!CollectionUtils.isEmpty(primaryKeyColumns),
         "Primary key columns must be configured for upsert enabled table: %s", _tableNameWithType);
 
-    _comparisonColumns = upsertConfig.getComparisonColumns();
-    if (_comparisonColumns == null) {
-      _comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
+    List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+    if (comparisonColumns == null) {
+      comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
     }
 
-    _deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
-    _hashFunction = upsertConfig.getHashFunction();
-
+    PartialUpsertHandler partialUpsertHandler = null;
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
       Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
       Preconditions.checkArgument(partialUpsertStrategies != null,
           "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
-      _partialUpsertHandler =
+      partialUpsertHandler =
           new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
-              _comparisonColumns);
+              comparisonColumns);
     }
 
-    _enableSnapshot = upsertConfig.isEnableSnapshot();
-    _metadataTTL = upsertConfig.getMetadataTTL();
-    _deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
-    _tableIndexDir = tableDataManager.getTableDataDir();
-    _serverMetrics = serverMetrics;
-    _helixManager = helixManager;
-    _segmentPreloadExecutor = segmentPreloadExecutor;
-
-    initCustomVariables();
-
+    String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
+    HashFunction hashFunction = upsertConfig.getHashFunction();
+    boolean enableSnapshot = upsertConfig.isEnableSnapshot();
+    boolean enablePreload = upsertConfig.isEnablePreload();
+    double metadataTTL = upsertConfig.getMetadataTTL();
+    double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
+    File tableIndexDir = tableDataManager.getTableDataDir();
+    _context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
+        .setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
+        .setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
+        .setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload)
+        .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setTableIndexDir(tableIndexDir).build();
     LOGGER.info(
         "Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {},"
             + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {},"
             + " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType,
-        _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, upsertConfig.getMode(),
-        _enableSnapshot, upsertConfig.isEnablePreload(), _metadataTTL, _deletedKeysTTL, _tableIndexDir);
+        primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), enableSnapshot,
+        enablePreload, metadataTTL, deletedKeysTTL, tableIndexDir);
+
+    initCustomVariables();
 
-    if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
+    if (enableSnapshot && enablePreload && segmentPreloadExecutor != null) {
       // Preloading the segments with snapshots for fast upsert metadata recovery.
       // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
       // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
@@ -226,7 +216,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   @VisibleForTesting
   IndexLoadingConfig createIndexLoadingConfig() {
     return new IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
-        _tableConfig, _schema);
+        _context.getTableConfig(), _context.getSchema());
   }
 
   @VisibleForTesting
@@ -266,7 +256,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   }
 
   private File getValidDocIdsSnapshotFile(String segmentName, String segmentTier) {
-    File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _tableConfig);
+    File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _context.getTableConfig());
     return new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
   }
 
@@ -277,6 +267,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
 
   @Override
   public UpsertConfig.Mode getUpsertMode() {
-    return _partialUpsertHandler == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL;
+    return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL;
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index fe36054f18..576d679368 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,9 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +28,6 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
@@ -38,7 +35,6 @@ import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.roaringbitmap.PeekableIntIterator;
@@ -58,12 +54,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   @VisibleForTesting
   final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
-  public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
-      List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
-      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
-    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
-        hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, deletedKeysTTL, tableIndexDir, serverMetrics);
+  public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
+    super(tableNameWithType, partitionId, context);
   }
 
   @Override
@@ -268,15 +260,15 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
     int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
     if (numDeletedTTLKeys > 0) {
-      _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}",
-          numDeletedTTLKeys, _tableNameWithType);
+      _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}", numDeletedTTLKeys,
+          _tableNameWithType);
       _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
           numDeletedTTLKeys);
     }
     int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get();
     if (numMetadataTTLKeys > 0) {
-      _logger.info("Deleted {} primary keys based on metadataTTL in the table {}",
-          numMetadataTTLKeys, _tableNameWithType);
+      _logger.info("Deleted {} primary keys based on metadataTTL in the table {}", numMetadataTTLKeys,
+          _tableNameWithType);
       _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED,
           numMetadataTTLKeys);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 2aeee10749..b0593f8d5f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -35,9 +35,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
   @Override
   public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
-        k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
-            _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
-            _enableSnapshot, _metadataTTL, _deletedKeysTTL, _tableIndexDir, _serverMetrics));
+        k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index b6ae51b265..2ac107d790 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.HelixManager;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -35,8 +34,9 @@ import org.apache.pinot.spi.data.Schema;
  */
 @ThreadSafe
 public interface TableUpsertMetadataManager extends Closeable {
-  void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics,
-      HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor);
+
+  void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager,
+      @Nullable ExecutorService segmentPreloadExecutor);
 
   PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
new file mode 100644
index 0000000000..707c22151c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class UpsertContext {
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final List<String> _primaryKeyColumns;
+  private final List<String> _comparisonColumns;
+  private final String _deleteRecordColumn;
+  private final HashFunction _hashFunction;
+  private final PartialUpsertHandler _partialUpsertHandler;
+  private final boolean _enableSnapshot;
+  private final boolean _enablePreload;
+  private final double _metadataTTL;
+  private final double _deletedKeysTTL;
+  private final File _tableIndexDir;
+
+  private UpsertContext(TableConfig tableConfig, Schema schema, List<String> primaryKeyColumns,
+      List<String> comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction,
+      @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, boolean enablePreload,
+      double metadataTTL, double deletedKeysTTL, File tableIndexDir) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _primaryKeyColumns = primaryKeyColumns;
+    _comparisonColumns = comparisonColumns;
+    _deleteRecordColumn = deleteRecordColumn;
+    _hashFunction = hashFunction;
+    _partialUpsertHandler = partialUpsertHandler;
+    _enableSnapshot = enableSnapshot;
+    _enablePreload = enablePreload;
+    _metadataTTL = metadataTTL;
+    _deletedKeysTTL = deletedKeysTTL;
+    _tableIndexDir = tableIndexDir;
+  }
+
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  public List<String> getPrimaryKeyColumns() {
+    return _primaryKeyColumns;
+  }
+
+  public List<String> getComparisonColumns() {
+    return _comparisonColumns;
+  }
+
+  public String getDeleteRecordColumn() {
+    return _deleteRecordColumn;
+  }
+
+  public HashFunction getHashFunction() {
+    return _hashFunction;
+  }
+
+  public PartialUpsertHandler getPartialUpsertHandler() {
+    return _partialUpsertHandler;
+  }
+
+  public boolean isSnapshotEnabled() {
+    return _enableSnapshot;
+  }
+
+  public boolean isPreloadEnabled() {
+    return _enablePreload;
+  }
+
+  public double getMetadataTTL() {
+    return _metadataTTL;
+  }
+
+  public double getDeletedKeysTTL() {
+    return _deletedKeysTTL;
+  }
+
+  public File getTableIndexDir() {
+    return _tableIndexDir;
+  }
+
+  public static class Builder {
+    private TableConfig _tableConfig;
+    private Schema _schema;
+    private List<String> _primaryKeyColumns;
+    private List<String> _comparisonColumns;
+    private String _deleteRecordColumn;
+    private HashFunction _hashFunction = HashFunction.NONE;
+    private PartialUpsertHandler _partialUpsertHandler;
+    private boolean _enableSnapshot;
+    private boolean _enablePreload;
+    private double _metadataTTL;
+    private double _deletedKeysTTL;
+    private File _tableIndexDir;
+
+    public Builder setTableConfig(TableConfig tableConfig) {
+      _tableConfig = tableConfig;
+      return this;
+    }
+
+    public Builder setSchema(Schema schema) {
+      _schema = schema;
+      return this;
+    }
+
+    public Builder setPrimaryKeyColumns(List<String> primaryKeyColumns) {
+      _primaryKeyColumns = primaryKeyColumns;
+      return this;
+    }
+
+    public Builder setComparisonColumns(List<String> comparisonColumns) {
+      _comparisonColumns = comparisonColumns;
+      return this;
+    }
+
+    public Builder setDeleteRecordColumn(String deleteRecordColumn) {
+      _deleteRecordColumn = deleteRecordColumn;
+      return this;
+    }
+
+    public Builder setHashFunction(HashFunction hashFunction) {
+      _hashFunction = hashFunction;
+      return this;
+    }
+
+    public Builder setPartialUpsertHandler(PartialUpsertHandler partialUpsertHandler) {
+      _partialUpsertHandler = partialUpsertHandler;
+      return this;
+    }
+
+    public Builder setEnableSnapshot(boolean enableSnapshot) {
+      _enableSnapshot = enableSnapshot;
+      return this;
+    }
+
+    public Builder setEnablePreload(boolean enablePreload) {
+      _enablePreload = enablePreload;
+      return this;
+    }
+
+    public Builder setMetadataTTL(double metadataTTL) {
+      _metadataTTL = metadataTTL;
+      return this;
+    }
+
+    public Builder setDeletedKeysTTL(double deletedKeysTTL) {
+      _deletedKeysTTL = deletedKeysTTL;
+      return this;
+    }
+
+    public Builder setTableIndexDir(File tableIndexDir) {
+      _tableIndexDir = tableIndexDir;
+      return this;
+    }
+
+    public UpsertContext build() {
+      Preconditions.checkState(_tableConfig != null, "Table config must be set");
+      Preconditions.checkState(_schema != null, "Schema must be set");
+      Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns), "Primary key columns must be set");
+      Preconditions.checkState(CollectionUtils.isNotEmpty(_comparisonColumns), "Comparison columns must be set");
+      Preconditions.checkState(_hashFunction != null, "Hash function must be set");
+      Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set");
+      return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn,
+          _hashFunction, _partialUpsertHandler, _enableSnapshot, _enablePreload, _metadataTTL, _deletedKeysTTL,
+          _tableIndexDir);
+    }
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index fe17e40dc2..38bc66ade5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -40,21 +40,35 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class MutableSegmentImplUpsertComparisonColTest {
   private static final String SCHEMA_FILE_PATH = "data/test_upsert_comparison_col_schema.json";
   private static final String DATA_FILE_PATH = "data/test_upsert_comparison_col_data.json";
-  private static CompositeTransformer _recordTransformer;
-  private static Schema _schema;
-  private static TableConfig _tableConfig;
-  private static MutableSegmentImpl _mutableSegmentImpl;
-  private static PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+  private TableDataManager _tableDataManager;
+  private TableConfig _tableConfig;
+  private Schema _schema;
+  private CompositeTransformer _recordTransformer;
+  private MutableSegmentImpl _mutableSegmentImpl;
+  private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+
+  @BeforeClass
+  public void setUp() {
+    ServerMetrics.register(mock(ServerMetrics.class));
+    _tableDataManager = mock(TableDataManager.class);
+    when(_tableDataManager.getTableDataDir()).thenReturn(new File(REALTIME_TABLE_NAME));
+  }
 
   private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) {
     UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
@@ -67,20 +81,19 @@ public class MutableSegmentImplUpsertComparisonColTest {
       throws Exception {
     URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
-    _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfig)
-            .build();
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build();
+    _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
-    tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
-        mock(HelixManager.class), mock(ExecutorService.class));
+    tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class),
+        mock(ExecutorService.class));
     _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
-            Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch",
-            _partitionUpsertMetadataManager, null);
+            Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch", _partitionUpsertMetadataManager,
+            null);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
         _schema.getColumnNames(), null)) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 4f243c6000..702bd47f80 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -41,22 +41,36 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class MutableSegmentImplUpsertTest {
   private static final String SCHEMA_FILE_PATH = "data/test_upsert_schema.json";
   private static final String DATA_FILE_PATH = "data/test_upsert_data.json";
-  private CompositeTransformer _recordTransformer;
-  private Schema _schema;
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+  private TableDataManager _tableDataManager;
   private TableConfig _tableConfig;
+  private Schema _schema;
+  private CompositeTransformer _recordTransformer;
   private MutableSegmentImpl _mutableSegmentImpl;
   private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
 
+  @BeforeClass
+  public void setUp() {
+    ServerMetrics.register(mock(ServerMetrics.class));
+    _tableDataManager = mock(TableDataManager.class);
+    when(_tableDataManager.getTableDataDir()).thenReturn(new File(REALTIME_TABLE_NAME));
+  }
+
   private UpsertConfig createPartialUpsertConfig(HashFunction hashFunction) {
     UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
     upsertConfigWithHash.setPartialUpsertStrategies(new HashMap<>());
@@ -76,15 +90,15 @@ public class MutableSegmentImplUpsertTest {
       throws Exception {
     URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
-    _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfigWithHash)
             .setNullHandlingEnabled(true).build();
+    _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
-    tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
-        mock(HelixManager.class), mock(ExecutorService.class));
+    tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class),
+        mock(ExecutorService.class));
     _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 35e621245c..17a1e048dc 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -48,7 +48,9 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
@@ -58,6 +60,7 @@ import org.mockito.MockedConstruction;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -72,13 +75,25 @@ import static org.testng.Assert.*;
 public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+  private static final List<String> PRIMARY_KEY_COLUMNS = Collections.singletonList("pk");
+  private static final List<String> COMPARISON_COLUMNS = Collections.singletonList("timeCol");
+  private static final String DELETE_RECORD_COLUMN = "deleteCol";
   private static final File INDEX_DIR =
       new File(FileUtils.getTempDirectory(), "ConcurrentMapPartitionUpsertMetadataManagerTest");
 
+  private UpsertContext.Builder _contextBuilder;
+
   @BeforeClass
   public void setUp()
       throws IOException {
     FileUtils.forceMkdir(INDEX_DIR);
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
+  @BeforeMethod
+  public void setUpContextBuilder() {
+    _contextBuilder = new UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+        .setPrimaryKeyColumns(PRIMARY_KEY_COLUMNS).setComparisonColumns(COMPARISON_COLUMNS).setTableIndexDir(INDEX_DIR);
   }
 
   @AfterClass
@@ -90,9 +105,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   @Test
   public void testStartFinishOperation() {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, 0, INDEX_DIR,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
 
     // Start 2 operations
     assertTrue(upsertMetadataManager.startOperation());
@@ -150,6 +163,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   @Test
   public void testUpsertMetadataCleanupWithTTLConfig()
       throws IOException {
+    _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
     verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
     verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
     verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
@@ -165,6 +179,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   @Test
   public void testGetQueryableDocIds() {
+    _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);
+
     boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true, false};
     int[] docIds1 = new int[]{2, 4, 5};
     MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -202,11 +218,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
       throws IOException {
-    String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
@@ -354,6 +368,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   @Test
   public void testAddReplaceRemoveSegmentWithRecordDelete()
       throws IOException {
+    _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);
     verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE, false);
     verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5, false);
     verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, false);
@@ -364,12 +379,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunction, boolean enableSnapshot)
       throws IOException {
-    String comparisonColumn = "timeCol";
-    String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0,
-            0, INDEX_DIR, mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).setEnableSnapshot(enableSnapshot).build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
@@ -548,11 +560,11 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] primaryKeys, int[] timestamps,
-                                             @Nullable boolean[] deleteRecordFlags) {
+      @Nullable boolean[] deleteRecordFlags) {
     List<RecordInfo> recordInfoList = new ArrayList<>();
     for (int i = 0; i < numRecords; i++) {
       recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new Integer(timestamps[i]),
-              deleteRecordFlags != null && deleteRecordFlags[i]));
+          deleteRecordFlags != null && deleteRecordFlags[i]));
     }
     return recordInfoList;
   }
@@ -667,11 +679,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddRecord(HashFunction hashFunction)
       throws IOException {
-    String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -760,11 +770,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddOutOfOrderRecord(HashFunction hashFunction)
       throws IOException {
-    String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -828,11 +836,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   private void verifyPreloadSegment(HashFunction hashFunction) {
-    String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -876,6 +882,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   @Test
   public void testAddRecordWithDeleteColumn()
       throws IOException {
+    _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);
     verifyAddRecordWithDeleteColumn(HashFunction.NONE);
     verifyAddRecordWithDeleteColumn(HashFunction.MD5);
     verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
@@ -883,12 +890,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
       throws IOException {
-    String comparisonColumn = "timeCol";
-    String deleteColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
-            false, 0, 0, INDEX_DIR, mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // queryableDocIds is same as validDocIds in the absence of delete markers
@@ -985,21 +989,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   @Test
   public void testRemoveExpiredDeletedKeys()
-          throws IOException {
+      throws IOException {
+    _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN).setDeletedKeysTTL(20);
     verifyRemoveExpiredDeletedKeys(HashFunction.NONE);
     verifyRemoveExpiredDeletedKeys(HashFunction.MD5);
     verifyRemoveExpiredDeletedKeys(HashFunction.MURMUR3);
   }
 
   private void verifyRemoveExpiredDeletedKeys(HashFunction hashFunction)
-          throws IOException {
-
-    String comparisonColumn = "timeCol";
-    String deleteColumn = "deleteCol";
+      throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-            new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-                    Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
-                    false, 0, 20, INDEX_DIR, mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -1010,9 +1011,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment1 =
-            mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
+        mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
     upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
-            getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator());
+        getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator());
 
     // Update records from the second segment
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
@@ -1069,16 +1070,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
   private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue)
       throws IOException {
-    File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
-
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, 0, tableDir,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -1095,8 +1098,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
     ImmutableSegmentImpl segment1 =
-        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
-            earlierComparisonValue, null);
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, earlierComparisonValue,
+            null);
 
     int[] docIds1 = new int[]{0, 1, 2, 3};
     MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -1141,12 +1144,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddOutOfTTLSegment()
       throws IOException {
-    File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
-
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -1163,8 +1162,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
     ImmutableSegmentImpl segment1 =
-        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
-            new Double(80), null);
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, new Double(80), null);
 
     int[] docIds1 = new int[]{0, 1, 2, 3};
     MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -1199,8 +1197,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     validDocIdsSnapshot2.add(docIds2);
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 =
-        mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, Collections.singletonList("timeCol"),
-            new Double(80), validDocIdsSnapshot2);
+        mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, COMPARISON_COLUMNS, new Double(80),
+            validDocIdsSnapshot2);
     upsertMetadataManager.addSegment(segment2);
     // out of ttl segment should not be added to recordLocationMap
     assertEquals(recordLocationMap.size(), 5);
@@ -1214,12 +1212,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddOutOfTTLSegmentWithRecordDelete()
       throws IOException {
-    String comparisonColumn = "timeCol";
-    String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
-            0, INDEX_DIR, mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
@@ -1235,8 +1229,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     int[] docIds1 = new int[]{2, 4, 5};
     MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
     validDocIdsSnapshot1.add(docIds1);
-    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1,
-        Collections.singletonList(comparisonColumn), new Double(120), validDocIdsSnapshot1);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1, COMPARISON_COLUMNS,
+            new Double(120), validDocIdsSnapshot1);
 
     // get recordInfo from validDocIdSnapshot.
     // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
@@ -1266,7 +1261,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     validDocIdsSnapshot2.add(docIds2);
     ImmutableSegmentImpl segment2 =
         mockImmutableSegmentWithEndTime(2, validDocIds2, queryableDocIds2, getPrimaryKeyList(numRecords, primaryKeys),
-            Collections.singletonList(comparisonColumn), new Double(40), validDocIdsSnapshot2);
+            COMPARISON_COLUMNS, new Double(40), validDocIdsSnapshot2);
 
     // get recordInfo from validDocIdSnapshot.
     // segment2 snapshot: 3 -> {3, 40}, 4 -> {4, 40}
@@ -1299,12 +1294,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[] deleteFlags,
       MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap queryableDocIds) {
-    String comparisonColumn = "timeCol";
-    String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
-            0, INDEX_DIR, mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
 
     try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
         (mockReader, context) -> {
@@ -1318,9 +1309,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
       ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
       when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length);
       when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
-        this.put(comparisonColumn, columnMetadata);
+        this.put(COMPARISON_COLUMNS.get(0), columnMetadata);
       }});
-      when(columnMetadata.getMaxValue()).thenReturn(null);
 
       ImmutableSegmentImpl segment =
           mockImmutableSegmentWithSegmentMetadata(1, new ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata,
@@ -1331,12 +1321,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddSegmentForTTL(Comparable comparisonValue)
       throws IOException {
-    File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
-
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -1352,8 +1338,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
     ImmutableSegmentImpl segment1 =
-        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), -1,
-            null);
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, -1, null);
 
     int[] docIds1 = new int[]{0, 1, 2, 3};
     MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -1395,9 +1380,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private void verifyPersistAndLoadWatermark()
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, 0, INDEX_DIR,
-            mock(ServerMetrics.class));
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
 
     double currentTimeMs = System.currentTimeMillis();
     upsertMetadataManager.persistWatermark(currentTimeMs);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
index 526e0920c3..8ceeb4ce0b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
@@ -59,12 +59,19 @@ import static org.testng.Assert.assertTrue;
 public class ConcurrentMapTableUpsertMetadataManagerTest {
   private static final File TEMP_DIR =
       new File(FileUtils.getTempDirectory(), "ConcurrentMapTableUpsertMetadataManagerTest");
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final File TABLE_DATA_DIR = new File(TEMP_DIR, REALTIME_TABLE_NAME);
+
+  private TableDataManager _tableDataManager;
   private ExecutorService _segmentPreloadExecutor;
 
   @BeforeClass
   public void setUp()
       throws Exception {
     FileUtils.deleteQuietly(TEMP_DIR);
+    ServerMetrics.register(mock(ServerMetrics.class));
+    _tableDataManager = mock(TableDataManager.class);
+    when(_tableDataManager.getTableDataDir()).thenReturn(TABLE_DATA_DIR);
     _segmentPreloadExecutor = Executors.newFixedThreadPool(1);
   }
 
@@ -86,16 +93,14 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
     // Preloading is skipped as snapshot is not enabled.
     ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager();
     assertFalse(mgr.isPreloading());
-    mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
-        _segmentPreloadExecutor);
+    mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor);
     assertFalse(mgr.isPreloading());
 
     // Preloading is skipped as preloading is not turned on.
     upsertConfig.setEnableSnapshot(true);
     mgr = new ConcurrentMapTableUpsertMetadataManager();
     assertFalse(mgr.isPreloading());
-    mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
-        _segmentPreloadExecutor);
+    mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor);
     assertFalse(mgr.isPreloading());
 
     upsertConfig.setEnablePreload(true);
@@ -103,8 +108,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
     assertFalse(mgr.isPreloading());
     // The preloading logic will hit on error as the HelixManager mock is not fully setup. But failure of preloading
     // should not fail the init() method.
-    mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
-        _segmentPreloadExecutor);
+    mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor);
     assertFalse(mgr.isPreloading());
   }
 
@@ -141,14 +145,13 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
     };
 
     // Setup mocks for TableConfig and Schema.
-    String tableNameWithType = "myTable_REALTIME";
     TableConfig tableConfig = mock(TableConfig.class);
     UpsertConfig upsertConfig = new UpsertConfig();
     upsertConfig.setComparisonColumn("ts");
     upsertConfig.setEnablePreload(true);
     upsertConfig.setEnableSnapshot(true);
     when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
-    when(tableConfig.getTableName()).thenReturn(tableNameWithType);
+    when(tableConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
     Schema schema = mock(Schema.class);
     when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
 
@@ -169,16 +172,18 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
     SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg01");
     realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
     when(propertyStore.get(
-        eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg01")), any(),
+        eq(ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, "online_seg01")), any(),
         anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
     realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg02");
     realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
     when(propertyStore.get(
-        eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg02")), any(),
+        eq(ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, "online_seg02")), any(),
         anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
 
     // No snapshot file for online_seg01, so it's skipped.
     TableDataManager tableDataManager = mock(TableDataManager.class);
+    when(tableDataManager.getTableDataDir()).thenReturn(TABLE_DATA_DIR);
+
     File seg01IdxDir = new File(TEMP_DIR, "online_seg01");
     FileUtils.forceMkdir(seg01IdxDir);
     when(tableDataManager.getSegmentDataDir("online_seg01", null, tableConfig)).thenReturn(seg01IdxDir);
@@ -189,7 +194,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
     when(tableDataManager.getSegmentDataDir("online_seg02", null, tableConfig)).thenReturn(seg02IdxDir);
 
     assertFalse(mgr.isPreloading());
-    mgr.init(tableConfig, schema, tableDataManager, mock(ServerMetrics.class), helixManager, _segmentPreloadExecutor);
+    mgr.init(tableConfig, schema, tableDataManager, helixManager, _segmentPreloadExecutor);
     assertEquals(preloadedSegments.size(), 1);
     assertTrue(preloadedSegments.contains("online_seg02"));
     assertTrue(wasPreloading.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org