You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/19 19:10:28 UTC

[pinot] branch master updated: For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 517b8813b8 For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923)
517b8813b8 is described below

commit 517b8813b8029bfcb75f90db043817256efce997
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Jun 19 12:10:23 2022 -0700

    For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923)
---
 .../manager/realtime/RealtimeTableDataManager.java | 181 +++++++++++----------
 .../local/dedup/PartitionDedupMetadataManager.java |  73 +++------
 .../local/dedup/TableDedupMetadataManager.java     |   9 +-
 .../segment/local/upsert/PartialUpsertHandler.java |  29 +---
 .../upsert/PartitionUpsertMetadataManager.java     |  13 +-
 .../local/upsert/TableUpsertMetadataManager.java   |   4 +
 .../local/utils/tablestate/TableStateUtils.java    |  18 +-
 .../dedup/PartitionDedupMetadataManagerTest.java   | 109 +++++--------
 .../mutable/MutableSegmentDedupeTest.java          |  19 +--
 .../local/upsert/PartialUpsertHandlerTest.java     |  16 +-
 10 files changed, 186 insertions(+), 285 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 36740212fc..195e5e023e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager.realtime;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
@@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
@@ -64,10 +64,10 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.utils.RecordInfo;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
+import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.DedupConfig;
-import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -115,9 +115,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   // likely that we get fresh data each time instead of multiple copies of roughly same data.
   private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
 
-  private UpsertConfig.Mode _upsertMode;
-  private TableUpsertMetadataManager _tableUpsertMetadataManager;
+  private final AtomicBoolean _allSegmentsLoaded = new AtomicBoolean();
+
   private TableDedupMetadataManager _tableDedupMetadataManager;
+  private TableUpsertMetadataManager _tableUpsertMetadataManager;
   private List<String> _primaryKeyColumns;
   private String _upsertComparisonColumn;
 
@@ -133,9 +134,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     try {
       _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
     } catch (IOException | ClassNotFoundException e) {
-      _logger
-          .error("Error reading history object for table {} from {}", _tableNameWithType, statsFile.getAbsolutePath(),
-              e);
+      _logger.error("Error reading history object for table {} from {}", _tableNameWithType,
+          statsFile.getAbsolutePath(), e);
       File savedFile = new File(_tableDataDir, STATS_FILE_NAME + "." + UUID.randomUUID());
       try {
         FileUtils.moveFile(statsFile, savedFile);
@@ -156,64 +156,62 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
     String consumerDirPath = getConsumerDir();
     File consumerDir = new File(consumerDirPath);
+    if (consumerDir.exists()) {
+      File[] segmentFiles = consumerDir.listFiles((dir, name) -> !name.equals(STATS_FILE_NAME));
+      Preconditions.checkState(segmentFiles != null, "Failed to list segment files from consumer dir: {} for table: {}",
+          consumerDirPath, _tableNameWithType);
+      for (File file : segmentFiles) {
+        if (file.delete()) {
+          _logger.info("Deleted old file {}", file.getAbsolutePath());
+        } else {
+          _logger.error("Cannot delete file {}", file.getAbsolutePath());
+        }
+      }
+    }
 
-    // NOTE: Upsert has to be set up when starting the server. Changing the table config without restarting the server
-    //       won't enable/disable the upsert on the fly.
+    // Set up dedup/upsert metadata manager
+    // NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the
+    //       server won't enable/disable them on the fly.
     TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType);
-    _upsertMode = tableConfig.getUpsertMode();
-    if (tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled()) {
+
+    DedupConfig dedupConfig = tableConfig.getDedupConfig();
+    boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled();
+    if (dedupEnabled) {
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
       Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
+
       _primaryKeyColumns = schema.getPrimaryKeyColumns();
-      DedupConfig dedupConfig = tableConfig.getDedupConfig();
-      HashFunction dedupHashFunction = dedupConfig.getHashFunction();
-      _tableDedupMetadataManager =
-          new TableDedupMetadataManager(_helixManager, _tableNameWithType, _primaryKeyColumns, _serverMetrics,
-              dedupHashFunction);
+      Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
+          "Primary key columns must be configured for dedup");
+      _tableDedupMetadataManager = new TableDedupMetadataManager(_tableNameWithType, _primaryKeyColumns, _serverMetrics,
+          dedupConfig.getHashFunction());
     }
 
-    if (isUpsertEnabled()) {
-      UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
-      assert upsertConfig != null;
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+    if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) {
+      Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s",
+          _tableUpsertMetadataManager);
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
       Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
 
-      PartialUpsertHandler partialUpsertHandler = null;
-      if (isPartialUpsertEnabled()) {
-        String comparisonColumn = upsertConfig.getComparisonColumn();
-        if (comparisonColumn == null) {
-          comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName();
-        }
-        partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema,
-            upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(),
-            comparisonColumn);
-      }
-      HashFunction hashFunction = upsertConfig.getHashFunction();
-      _tableUpsertMetadataManager =
-          new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler, hashFunction);
       _primaryKeyColumns = schema.getPrimaryKeyColumns();
       Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
           "Primary key columns must be configured for upsert");
       String comparisonColumn = upsertConfig.getComparisonColumn();
       _upsertComparisonColumn =
           comparisonColumn != null ? comparisonColumn : tableConfig.getValidationConfig().getTimeColumnName();
-    }
 
-    if (consumerDir.exists()) {
-      File[] segmentFiles = consumerDir.listFiles(new FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-          return !name.equals(STATS_FILE_NAME);
-        }
-      });
-      for (File file : segmentFiles) {
-        if (file.delete()) {
-          _logger.info("Deleted old file {}", file.getAbsolutePath());
-        } else {
-          _logger.error("Cannot delete file {}", file.getAbsolutePath());
-        }
+      PartialUpsertHandler partialUpsertHandler = null;
+      if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
+        assert upsertConfig.getPartialUpsertStrategies() != null;
+        partialUpsertHandler = new PartialUpsertHandler(schema, upsertConfig.getPartialUpsertStrategies(),
+            upsertConfig.getDefaultPartialUpsertStrategy(), _upsertComparisonColumn);
       }
+
+      _tableUpsertMetadataManager =
+          new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler,
+              upsertConfig.getHashFunction());
     }
   }
 
@@ -266,11 +264,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   }
 
   public boolean isUpsertEnabled() {
-    return _upsertMode != UpsertConfig.Mode.NONE;
+    return _tableUpsertMetadataManager != null;
   }
 
   public boolean isPartialUpsertEnabled() {
-    return _upsertMode == UpsertConfig.Mode.PARTIAL;
+    return _tableUpsertMetadataManager != null && _tableUpsertMetadataManager.isPartialUpsertEnabled();
   }
 
   /*
@@ -358,8 +356,19 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
           _tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager(
               partitionGroupId) : null;
       PartitionDedupMetadataManager partitionDedupMetadataManager =
-          _tableDedupMetadataManager != null ? _tableDedupMetadataManager
-              .getOrCreatePartitionManager(partitionGroupId) : null;
+          _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
+              : null;
+      // For dedup and partial-upsert, wait for all segments loaded before creating the consuming segment
+      if (isDedupEnabled() || isPartialUpsertEnabled()) {
+        if (!_allSegmentsLoaded.get()) {
+          synchronized (_allSegmentsLoaded) {
+            if (!_allSegmentsLoaded.get()) {
+              TableStateUtils.waitForAllSegmentsLoaded(_helixManager, _tableNameWithType);
+              _allSegmentsLoaded.set(true);
+            }
+          }
+        }
+      }
       segmentDataManager =
           new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
               indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager,
@@ -390,10 +399,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   private void buildDedupMeta(ImmutableSegmentImpl immutableSegment) {
     // TODO(saurabh) refactor commons code with handleUpsert
     String segmentName = immutableSegment.getSegmentName();
-    Integer partitionGroupId = SegmentUtils
-        .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
-    Preconditions.checkNotNull(partitionGroupId, String
-        .format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName,
+    Integer partitionGroupId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager,
+            _primaryKeyColumns.get(0));
+    Preconditions.checkNotNull(partitionGroupId,
+        String.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName,
             _tableNameWithType));
     PartitionDedupMetadataManager partitionDedupMetadataManager =
         _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId);
@@ -403,10 +413,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
     String segmentName = immutableSegment.getSegmentName();
-    Integer partitionGroupId = SegmentUtils
-        .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
-    Preconditions.checkNotNull(partitionGroupId, String
-        .format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName,
+    Integer partitionGroupId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager,
+            _primaryKeyColumns.get(0));
+    Preconditions.checkNotNull(partitionGroupId,
+        String.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName,
             _tableNameWithType));
     PartitionUpsertMetadataManager partitionUpsertMetadataManager =
         _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
@@ -417,37 +428,35 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     for (String primaryKeyColumn : _primaryKeyColumns) {
       columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(immutableSegment, primaryKeyColumn));
     }
-    columnToReaderMap
-        .put(_upsertComparisonColumn, new PinotSegmentColumnReader(immutableSegment, _upsertComparisonColumn));
+    columnToReaderMap.put(_upsertComparisonColumn,
+        new PinotSegmentColumnReader(immutableSegment, _upsertComparisonColumn));
     int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
     int numPrimaryKeyColumns = _primaryKeyColumns.size();
-    Iterator<RecordInfo> recordInfoIterator =
-        new Iterator<RecordInfo>() {
-          private int _docId = 0;
+    Iterator<RecordInfo> recordInfoIterator = new Iterator<RecordInfo>() {
+      private int _docId = 0;
 
-          @Override
-          public boolean hasNext() {
-            return _docId < numTotalDocs;
-          }
+      @Override
+      public boolean hasNext() {
+        return _docId < numTotalDocs;
+      }
 
-          @Override
-          public RecordInfo next() {
-            Object[] values = new Object[numPrimaryKeyColumns];
-            for (int i = 0; i < numPrimaryKeyColumns; i++) {
-              Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
-              if (value instanceof byte[]) {
-                value = new ByteArray((byte[]) value);
-              }
-              values[i] = value;
-            }
-            PrimaryKey primaryKey = new PrimaryKey(values);
-            Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId);
-            Preconditions.checkState(upsertComparisonValue instanceof Comparable,
-                "Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
-            return new RecordInfo(primaryKey, _docId++,
-                (Comparable) upsertComparisonValue);
+      @Override
+      public RecordInfo next() {
+        Object[] values = new Object[numPrimaryKeyColumns];
+        for (int i = 0; i < numPrimaryKeyColumns; i++) {
+          Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
+          if (value instanceof byte[]) {
+            value = new ByteArray((byte[]) value);
           }
-        };
+          values[i] = value;
+        }
+        PrimaryKey primaryKey = new PrimaryKey(values);
+        Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId);
+        Preconditions.checkState(upsertComparisonValue instanceof Comparable,
+            "Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
+        return new RecordInfo(primaryKey, _docId++, (Comparable) upsertComparisonValue);
+      }
+    };
     partitionUpsertMetadataManager.addSegment(immutableSegment, recordInfoIterator);
   }
 
@@ -526,8 +535,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
     return
         CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
-            || CommonConstants.HTTPS_PROTOCOL
-            .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+            || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(
+            tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
   }
 
   private void downloadSegmentFromPeer(String segmentName, String downloadScheme,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
index 353d33c951..258aad7a5d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
@@ -24,25 +24,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class PartitionDedupMetadataManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionDedupMetadataManager.class);
-  private static boolean _allSegmentsLoaded;
-
-  private final HelixManager _helixManager;
   private final String _tableNameWithType;
   private final List<String> _primaryKeyColumns;
   private final int _partitionId;
@@ -52,9 +44,8 @@ public class PartitionDedupMetadataManager {
   @VisibleForTesting
   final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
 
-  public PartitionDedupMetadataManager(HelixManager helixManager, String tableNameWithType,
-      List<String> primaryKeyColumns, int partitionId, ServerMetrics serverMetrics, HashFunction hashFunction) {
-    _helixManager = helixManager;
+  public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
+      ServerMetrics serverMetrics, HashFunction hashFunction) {
     _tableNameWithType = tableNameWithType;
     _primaryKeyColumns = primaryKeyColumns;
     _partitionId = partitionId;
@@ -64,7 +55,7 @@ public class PartitionDedupMetadataManager {
 
   public void addSegment(IndexSegment segment) {
     // Add all PKs to _primaryKeyToSegmentMap
-    Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns);
+    Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
     while (primaryKeyIterator.hasNext()) {
       PrimaryKey pk = primaryKeyIterator.next();
       _primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), segment);
@@ -75,30 +66,29 @@ public class PartitionDedupMetadataManager {
 
   public void removeSegment(IndexSegment segment) {
     // TODO(saurabh): Explain reload scenario here
-    Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns);
+    Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
     while (primaryKeyIterator.hasNext()) {
       PrimaryKey pk = primaryKeyIterator.next();
-      _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction),
-          (primaryKey, currentSegment) -> {
-            if (currentSegment == segment) {
-              return null;
-            } else {
-              return currentSegment;
-            }
-          });
+      _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction), (primaryKey, currentSegment) -> {
+        if (currentSegment == segment) {
+          return null;
+        } else {
+          return currentSegment;
+        }
+      });
     }
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
         _primaryKeyToSegmentMap.size());
   }
 
   @VisibleForTesting
-  public static Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment, List<String> primaryKeyColumns) {
+  Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
     Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
-    for (String primaryKeyColumn : primaryKeyColumns) {
+    for (String primaryKeyColumn : _primaryKeyColumns) {
       columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(segment, primaryKeyColumn));
     }
     int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
-    int numPrimaryKeyColumns = primaryKeyColumns.size();
+    int numPrimaryKeyColumns = _primaryKeyColumns.size();
     return new Iterator<PrimaryKey>() {
       private int _docId = 0;
 
@@ -111,7 +101,7 @@ public class PartitionDedupMetadataManager {
       public PrimaryKey next() {
         Object[] values = new Object[numPrimaryKeyColumns];
         for (int i = 0; i < numPrimaryKeyColumns; i++) {
-          Object value = columnToReaderMap.get(primaryKeyColumns.get(i)).getValue(_docId);
+          Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
           if (value instanceof byte[]) {
             value = new ByteArray((byte[]) value);
           }
@@ -123,34 +113,13 @@ public class PartitionDedupMetadataManager {
     };
   }
 
-  private synchronized void waitTillAllSegmentsLoaded() {
-    if (_allSegmentsLoaded) {
-      return;
-    }
-
-    while (!TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType)) {
-      LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
-      try {
-        //noinspection BusyWait
-        Thread.sleep(1000L);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    _allSegmentsLoaded = true;
-  }
-
   public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) {
-    if (!_allSegmentsLoaded) {
-      waitTillAllSegmentsLoaded();
+    boolean present =
+        _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) != null;
+    if (!present) {
+      _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+          _primaryKeyToSegmentMap.size());
     }
-
-    boolean result =
-        (_primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment)
-            != null);
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
-        _primaryKeyToSegmentMap.size());
-
-    return result;
+    return present;
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index 6e25974970..dedcebfff0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -21,22 +21,19 @@ package org.apache.pinot.segment.local.dedup;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.spi.config.table.HashFunction;
 
 
 public class TableDedupMetadataManager {
   private final Map<Integer, PartitionDedupMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>();
-  private final HelixManager _helixManager;
   private final String _tableNameWithType;
   private final List<String> _primaryKeyColumns;
   private final ServerMetrics _serverMetrics;
   private final HashFunction _hashFunction;
 
-  public TableDedupMetadataManager(HelixManager helixManager, String tableNameWithType, List<String> primaryKeyColumns,
+  public TableDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns,
       ServerMetrics serverMetrics, HashFunction hashFunction) {
-    _helixManager = helixManager;
     _tableNameWithType = tableNameWithType;
     _primaryKeyColumns = primaryKeyColumns;
     _serverMetrics = serverMetrics;
@@ -45,7 +42,7 @@ public class TableDedupMetadataManager {
 
   public PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
-        k -> new PartitionDedupMetadataManager(_helixManager, _tableNameWithType, _primaryKeyColumns, k,
-            _serverMetrics, _hashFunction));
+        k -> new PartitionDedupMetadataManager(_tableNameWithType, _primaryKeyColumns, k, _serverMetrics,
+            _hashFunction));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 720dc8a181..4a1cfad39f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -20,29 +20,22 @@ package org.apache.pinot.segment.local.upsert;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.helix.HelixManager;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
+
 /**
  * Handler for partial-upsert.
  */
 public class PartialUpsertHandler {
   // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final HelixManager _helixManager;
-  private final String _tableNameWithType;
   private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private boolean _allSegmentsLoaded;
 
-  public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, Schema schema,
-      Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy,
-      String comparisonColumn) {
-    _helixManager = helixManager;
-    _tableNameWithType = tableNameWithType;
+  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
+      UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) {
     for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
       _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
     }
@@ -56,18 +49,6 @@ public class PartialUpsertHandler {
     }
   }
 
-  /**
-   * Returns {@code true} if all segments assigned to the current instance are loaded, {@code false} otherwise.
-   * Consuming segment should perform this check to ensure all previous records are loaded before inserting new records.
-   */
-  public synchronized boolean isAllSegmentsLoaded() {
-    if (_allSegmentsLoaded) {
-      return true;
-    }
-    _allSegmentsLoaded = TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType);
-    return _allSegmentsLoaded;
-  }
-
   /**
    * Merges 2 records and returns the merged record.
    * We used a map to indicate all configured fields for partial upsert. For these fields
@@ -91,8 +72,8 @@ public class PartialUpsertHandler {
           newRecord.putValue(column, previousRecord.getValue(column));
           newRecord.removeNullValueField(column);
         } else {
-          newRecord
-              .putValue(column, entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column)));
+          newRecord.putValue(column,
+              entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column)));
         }
       }
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 02289cd1c1..041a86443a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory;
  *   </li>
  * </ul>
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
+@SuppressWarnings("unchecked")
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
@@ -204,17 +204,6 @@ public class PartitionUpsertMetadataManager {
       return record;
     }
 
-    // Ensure all previous records are loaded before inserting new records
-    while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
-      LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
-      try {
-        //noinspection BusyWait
-        Thread.sleep(1000L);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
     RecordLocation currentRecordLocation =
         _primaryKeyToRecordLocationMap.get(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction));
     if (currentRecordLocation != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 6ccd9315a6..384268a494 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -50,4 +50,8 @@ public class TableUpsertMetadataManager {
         k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, _serverMetrics, _partialUpsertHandler,
             _hashFunction));
   }
+
+  public boolean isPartialUpsertEnabled() {
+    return _partialUpsertHandler != null;
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index b83ac7d51d..2392a0bcd1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -29,6 +29,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class TableStateUtils {
   private static final Logger LOGGER = LoggerFactory.getLogger(TableStateUtils.class);
 
@@ -72,8 +73,8 @@ public class TableStateUtils {
       String actualState = currentStateMap.get(segmentName);
       if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) {
         if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
-          LOGGER
-              .error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType, expectedState);
+          LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType,
+              expectedState);
         } else {
           LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName, tableNameWithType,
               expectedState, actualState);
@@ -85,4 +86,17 @@ public class TableStateUtils {
     LOGGER.info("All segments loaded for table: {}", tableNameWithType);
     return true;
   }
+
+  public static void waitForAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) {
+    try {
+      while (!TableStateUtils.isAllSegmentsLoaded(helixManager, tableNameWithType)) {
+        LOGGER.info("Sleeping 1 second waiting for all segments loaded for table: {}", tableNameWithType);
+        //noinspection BusyWait
+        Thread.sleep(1000L);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Caught exception while waiting for all segments loaded for table: " + tableNameWithType, e);
+    }
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
index 124aad1c63..39f7d6ae98 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
@@ -19,27 +19,22 @@
 package org.apache.pinot.segment.local.dedup;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.local.utils.RecordInfo;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.mockito.MockedStatic;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertSame;
@@ -49,19 +44,12 @@ public class PartitionDedupMetadataManagerTest {
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
-  @BeforeClass
-  public void init() {
-    MockedStatic mocked = mockStatic(TableStateUtils.class);
-    mocked.when(() -> TableStateUtils.isAllSegmentsLoaded(any(), any())).thenReturn(true);
-  }
-
   @Test
   public void verifyAddRemoveSegment() {
     HashFunction hashFunction = HashFunction.NONE;
-    PartitionDedupMetadataManager partitionDedupMetadataManager =
-        new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
-            mock(ServerMetrics.class), hashFunction);
-    Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
+    TestMetadataManager metadataManager =
+        new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction);
+    Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap;
 
     // Add the first segment
     List<PrimaryKey> pkList1 = new ArrayList<>();
@@ -71,41 +59,24 @@ public class PartitionDedupMetadataManagerTest {
     pkList1.add(getPrimaryKey(0));
     pkList1.add(getPrimaryKey(1));
     pkList1.add(getPrimaryKey(0));
+    metadataManager._primaryKeyIterator = pkList1.iterator();
     ImmutableSegmentImpl segment1 = mockSegment(1);
-    MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
-    mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
-        .thenReturn(pkList1.iterator());
-
-    partitionDedupMetadataManager.addSegment(segment1);
+    metadataManager.addSegment(segment1);
     checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
 
-    pkList1 = new ArrayList<>();
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(2));
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(0));
-
-    mocked.close();
-    mocked = mockStatic(PartitionDedupMetadataManager.class);
-    mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
-        .thenReturn(pkList1.iterator());
-
-    partitionDedupMetadataManager.removeSegment(segment1);
+    metadataManager._primaryKeyIterator = pkList1.iterator();
+    metadataManager.removeSegment(segment1);
     Assert.assertEquals(recordLocationMap.size(), 0);
-    mocked.close();
   }
 
   @Test
   public void verifyReloadSegment() {
     HashFunction hashFunction = HashFunction.NONE;
-    PartitionDedupMetadataManager partitionDedupMetadataManager =
-        new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
-            mock(ServerMetrics.class), hashFunction);
-    Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
+    TestMetadataManager metadataManager =
+        new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction);
+    Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap;
 
     // Add the first segment
     List<PrimaryKey> pkList1 = new ArrayList<>();
@@ -115,45 +86,28 @@ public class PartitionDedupMetadataManagerTest {
     pkList1.add(getPrimaryKey(0));
     pkList1.add(getPrimaryKey(1));
     pkList1.add(getPrimaryKey(0));
+    metadataManager._primaryKeyIterator = pkList1.iterator();
     ImmutableSegmentImpl segment1 = mockSegment(1);
-    MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
-    mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
-        .thenReturn(pkList1.iterator());
-
-    partitionDedupMetadataManager.addSegment(segment1);
+    metadataManager.addSegment(segment1);
 
     // Remove another segment with same PK rows
-    pkList1 = new ArrayList<>();
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(2));
-    pkList1.add(getPrimaryKey(0));
-    pkList1.add(getPrimaryKey(1));
-    pkList1.add(getPrimaryKey(0));
+    metadataManager._primaryKeyIterator = pkList1.iterator();
     ImmutableSegmentImpl segment2 = mockSegment(1);
-
-    mocked.close();
-    mocked = mockStatic(PartitionDedupMetadataManager.class);
-    mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
-        .thenReturn(pkList1.iterator());
-
-    partitionDedupMetadataManager.removeSegment(segment2);
+    metadataManager.removeSegment(segment2);
     Assert.assertEquals(recordLocationMap.size(), 3);
 
     // Keys should still exist
     checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
     checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
     checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
-    mocked.close();
   }
 
   @Test
   public void verifyAddRow() {
     HashFunction hashFunction = HashFunction.NONE;
-    PartitionDedupMetadataManager partitionDedupMetadataManager =
-        new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
-            mock(ServerMetrics.class), hashFunction);
-    Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
+    TestMetadataManager metadataManager =
+        new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction);
+    Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap;
 
     // Add the first segment
     List<PrimaryKey> pkList1 = new ArrayList<>();
@@ -163,28 +117,25 @@ public class PartitionDedupMetadataManagerTest {
     pkList1.add(getPrimaryKey(0));
     pkList1.add(getPrimaryKey(1));
     pkList1.add(getPrimaryKey(0));
+    metadataManager._primaryKeyIterator = pkList1.iterator();
     ImmutableSegmentImpl segment1 = mockSegment(1);
-    MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
-    mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
-        .thenReturn(pkList1.iterator());
-    partitionDedupMetadataManager.addSegment(segment1);
-    mocked.close();
+    metadataManager.addSegment(segment1);
 
     // Same PK exists
     RecordInfo recordInfo = mock(RecordInfo.class);
     when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(0));
     ImmutableSegmentImpl segment2 = mockSegment(2);
-    Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+    Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
     checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
 
     // New PK
     when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
-    Assert.assertFalse(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+    Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
     checkRecordLocation(recordLocationMap, 3, segment2, hashFunction);
 
     // Same PK as the one recently ingested
     when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
-    Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+    Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
   }
 
   private static ImmutableSegmentImpl mockSegment(int sequenceNumber) {
@@ -208,4 +159,18 @@ public class PartitionDedupMetadataManagerTest {
     assertNotNull(indexSegment);
     assertSame(indexSegment, segment);
   }
+
+  private static class TestMetadataManager extends PartitionDedupMetadataManager {
+    Iterator<PrimaryKey> _primaryKeyIterator;
+
+    TestMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
+        ServerMetrics serverMetrics, HashFunction hashFunction) {
+      super(tableNameWithType, primaryKeyColumns, partitionId, serverMetrics, hashFunction);
+    }
+
+    @Override
+    Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
+      return _primaryKeyIterator;
+    }
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index 7acfbdccdc..6f310c863c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -22,12 +22,10 @@ package org.apache.pinot.segment.local.indexsegment.mutable;
 import java.io.File;
 import java.net.URL;
 import java.util.Collections;
-import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -38,28 +36,16 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mockStatic;
-
 
 public class MutableSegmentDedupeTest {
-
   private static final String SCHEMA_FILE_PATH = "data/test_dedup_schema.json";
   private static final String DATA_FILE_PATH = "data/test_dedup_data.json";
   private MutableSegmentImpl _mutableSegmentImpl;
 
-  @BeforeClass
-  public void init() {
-    MockedStatic mocked = mockStatic(TableStateUtils.class);
-    mocked.when(() -> TableStateUtils.isAllSegmentsLoaded(any(), any())).thenReturn(true);
-  }
-
   private void setup(boolean dedupEnabled)
       throws Exception {
     URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
@@ -70,9 +56,8 @@ public class MutableSegmentDedupeTest {
     CompositeTransformer recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     PartitionDedupMetadataManager partitionDedupMetadataManager =
-        (dedupEnabled) ? new TableDedupMetadataManager(Mockito.mock(HelixManager.class), "testTable_REALTIME",
-            schema.getPrimaryKeyColumns(), Mockito.mock(ServerMetrics.class),
-            HashFunction.NONE).getOrCreatePartitionManager(0) : null;
+        (dedupEnabled) ? new TableDedupMetadataManager("testTable_REALTIME", schema.getPrimaryKeyColumns(),
+            Mockito.mock(ServerMetrics.class), HashFunction.NONE).getOrCreatePartitionManager(0) : null;
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, null, "secondsSinceEpoch", null, partitionDedupMetadataManager);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 471a46c803..31a508e988 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -21,12 +21,10 @@ package org.apache.pinot.segment.local.upsert;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.helix.HelixManager;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -38,19 +36,14 @@ public class PartialUpsertHandlerTest {
 
   @Test
   public void testMerge() {
-    HelixManager helixManager = Mockito.mock(HelixManager.class);
-
     Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
         .addSingleValueDimension("field1", FieldSpec.DataType.LONG)
         .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
         .setPrimaryKeyColumns(Arrays.asList("pk")).build();
-
-    String realtimeTableName = "testTable_REALTIME";
     Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
     partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
     PartialUpsertHandler handler =
-        new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
-            UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+        new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
 
     // both records are null.
     GenericRow previousRecord = new GenericRow();
@@ -97,19 +90,14 @@ public class PartialUpsertHandlerTest {
 
   @Test
   public void testMergeWithDefaultPartialUpsertStrategy() {
-    HelixManager helixManager = Mockito.mock(HelixManager.class);
-
     Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
         .addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG)
         .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
         .setPrimaryKeyColumns(Arrays.asList("pk")).build();
-
-    String realtimeTableName = "testTable_REALTIME";
     Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
     partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
     PartialUpsertHandler handler =
-        new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
-            UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+        new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
 
     // previousRecord is null default value, while newRecord is not.
     GenericRow previousRecord = new GenericRow();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org