You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/11 07:52:57 UTC

[pinot] branch master updated: allow to preload segments with upsert snapshots to speedup table loading (#11020)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bb7e6a7b0 allow to preload segments with upsert snapshots to speedup table loading (#11020)
4bb7e6a7b0 is described below

commit 4bb7e6a7b0bf9ee502b814819c7194d4974addc1
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Jul 11 00:52:51 2023 -0700

    allow to preload segments with upsert snapshots to speedup table loading (#11020)
---
 .../core/data/manager/BaseTableDataManager.java    |  11 +-
 .../manager/offline/TableDataManagerProvider.java  |  11 +-
 .../manager/realtime/RealtimeTableDataManager.java |  16 +-
 .../BaseTableDataManagerAcquireSegmentTest.java    |   2 +-
 .../data/manager/BaseTableDataManagerTest.java     |   6 +-
 .../offline/DimensionTableDataManagerTest.java     |   2 +-
 .../realtime/RealtimeTableDataManagerTest.java     |   4 +-
 .../local/data/manager/TableDataManager.java       |  21 +-
 .../upsert/BasePartitionUpsertMetadataManager.java |  74 +++++++-
 .../upsert/BaseTableUpsertMetadataManager.java     | 151 ++++++++++++++-
 ...oncurrentMapPartitionUpsertMetadataManager.java |  14 ++
 .../upsert/PartitionUpsertMetadataManager.java     |   7 +
 .../local/upsert/TableUpsertMetadataManager.java   |   9 +-
 .../upsert/TableUpsertMetadataManagerFactory.java  |   8 +-
 .../MutableSegmentImplUpsertComparisonColTest.java |   5 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |   5 +-
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  56 +++++-
 ...oncurrentMapTableUpsertMetadataManagerTest.java | 211 +++++++++++++++++++++
 .../starter/helix/HelixInstanceDataManager.java    |  20 +-
 .../helix/HelixInstanceDataManagerConfig.java      |   7 +
 .../apache/pinot/server/api/BaseResourceTest.java  |   5 +-
 .../pinot/spi/config/table/UpsertConfig.java       |  11 ++
 .../upsertMeetupRsvp_realtime_table_config.json    |   4 +-
 23 files changed, 626 insertions(+), 34 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 882af138d4..04059f94c1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -99,6 +100,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
   protected File _resourceTmpDir;
   protected Logger _logger;
   protected HelixManager _helixManager;
+  protected ExecutorService _segmentPreloadExecutor;
   protected AuthProvider _authProvider;
   protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
   protected boolean _isStreamSegmentDownloadUntar;
@@ -114,6 +116,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
   @Override
   public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
+      @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
       TableDataManagerParams tableDataManagerParams) {
     LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName());
@@ -123,6 +126,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
     _propertyStore = propertyStore;
     _serverMetrics = serverMetrics;
     _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
 
     _authProvider =
         AuthProviderUtils.extractAuthProvider(toPinotConfiguration(_tableDataManagerConfig.getAuthConfig()), null);
@@ -681,8 +685,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
     return new File(_indexDir, segmentName);
   }
 
-  @VisibleForTesting
-  File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
+  @Override
+  public File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
     if (segmentTier == null) {
       return getSegmentDataDir(segmentName);
     }
@@ -763,7 +767,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
    * object may be created when trying to load the segment, but it's closed if the method
    * returns false; otherwise it's opened and to be referred by ImmutableSegment object.
    */
-  protected boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+  @Override
+  public boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
       SegmentZKMetadata zkMetadata) {
     // Try to recover the segment from potential segment reloading failure.
     String segmentTier = zkMetadata.getTier();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 6b83d33dce..946ecf1565 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -20,8 +20,10 @@ package org.apache.pinot.core.data.manager.offline;
 
 import com.google.common.cache.LoadingCache;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
@@ -60,13 +62,14 @@ public class TableDataManagerProvider {
   public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
       LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
-    return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager,
+    return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, null,
         errorCache, () -> true);
   }
 
   public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
-      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) {
+      @Nullable ExecutorService segmentPreloadExecutor, LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
+      Supplier<Boolean> isServerReadyToServeQueries) {
     TableDataManager tableDataManager;
     switch (tableDataManagerConfig.getTableType()) {
       case OFFLINE:
@@ -90,8 +93,8 @@ public class TableDataManagerProvider {
       default:
         throw new IllegalStateException();
     }
-    tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache,
-        _tableDataManagerParams);
+    tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager,
+        segmentPreloadExecutor, errorCache, _tableDataManagerParams);
     return tableDataManager;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 967cf48b2a..653d6bba3f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -125,6 +125,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   private TableDedupMetadataManager _tableDedupMetadataManager;
   private TableUpsertMetadataManager _tableUpsertMetadataManager;
+  private boolean _isUpsertEnabled;
   private BooleanSupplier _isTableReadyToConsumeData;
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
@@ -205,7 +206,12 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
           _tableUpsertMetadataManager);
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
       Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
-      _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics);
+      // While creating _tableUpsertMetadataManager object, some methods want to check if upsert is enabled, so track
+      // this status with a boolean, instead of relying on if _tableUpsertMetadataManager is null or not.
+      _isUpsertEnabled = true;
+      _tableUpsertMetadataManager =
+          TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics, _helixManager,
+              _segmentPreloadExecutor);
     }
 
     // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
@@ -352,7 +358,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   }
 
   public boolean isUpsertEnabled() {
-    return _tableUpsertMetadataManager != null;
+    return _isUpsertEnabled;
   }
 
   public boolean isPartialUpsertEnabled() {
@@ -533,7 +539,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
     SegmentDataManager oldSegmentManager = registerSegment(segmentName, newSegmentManager);
     if (oldSegmentManager == null) {
-      partitionUpsertMetadataManager.addSegment(immutableSegment);
+      if (_tableUpsertMetadataManager.isPreloading()) {
+        partitionUpsertMetadataManager.preloadSegment(immutableSegment);
+      } else {
+        partitionUpsertMetadataManager.addSegment(immutableSegment);
+      }
       _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
     } else {
       IndexSegment oldSegment = oldSegmentManager.getSegment();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index c679a6d2fe..9f1dfbbe0a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -130,7 +130,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
       when(config.getTableDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
     }
     tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
         new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index b4cc98cbf1..7eb2ef0acd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -719,7 +719,7 @@ public class BaseTableDataManagerTest {
 
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
         new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     return tableDataManager;
@@ -728,7 +728,7 @@ public class BaseTableDataManagerTest {
   private static BaseTableDataManager createTableManager(TableDataManagerConfig config, HelixManager helixManager) {
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, null,
         new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     return tableDataManager;
@@ -737,7 +737,7 @@ public class BaseTableDataManagerTest {
   private static OfflineTableDataManager createSpyOfflineTableManager(TableDataManagerConfig tableDataManagerConfig) {
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(tableDataManagerConfig, "dummyInstance", mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
         new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     return Mockito.spy(tableDataManager);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 132b94e72a..3381d9c383 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -133,7 +133,7 @@ public class DimensionTableDataManagerTest {
       when(config.getDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     }
     tableDataManager.init(config, "dummyInstance", helixManager.getHelixPropertyStore(),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, null,
         new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     return tableDataManager;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index 392e562fac..45196e2730 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -100,7 +100,7 @@ public class RealtimeTableDataManagerTest {
     TableConfig tableConfig = setupTableConfig(propertyStore);
     Schema schema = setupSchema(propertyStore);
     tmgr.init(tableDataManagerConfig, "server01", propertyStore,
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
         new TableDataManagerParams(0, false, -1));
 
     // Create a dummy local segment.
@@ -135,7 +135,7 @@ public class RealtimeTableDataManagerTest {
     TableConfig tableConfig = setupTableConfig(propertyStore);
     Schema schema = setupSchema(propertyStore);
     tmgr.init(tableDataManagerConfig, "server01", propertyStore,
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
         new TableDataManagerParams(0, false, -1));
 
     // Create a raw segment and put it in deep store backed by local fs.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index ae53ce3383..45965f4bc4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -22,6 +22,7 @@ import com.google.common.cache.LoadingCache;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.lang3.tuple.Pair;
@@ -34,6 +35,7 @@ import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -48,7 +50,9 @@ public interface TableDataManager {
    */
   void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
-      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, TableDataManagerParams tableDataManagerParams);
+      @Nullable ExecutorService segmentPreloadExecutor,
+      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
+      TableDataManagerParams tableDataManagerParams);
 
   /**
    * Starts the table data manager. Should be called only once after table data manager gets initialized but before
@@ -126,6 +130,21 @@ public interface TableDataManager {
    */
   void removeSegment(String segmentName);
 
+  /**
+   * Try to load a segment from an existing segment directory managed by the server. The segment loading may fail
+   * because the directory may not exist any more, or the segment data has a different crc now, or the existing segment
+   * data got corrupted etc.
+   *
+   * @return true if the segment is loaded successfully from the existing segment directory; false otherwise.
+   */
+  boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata);
+
+  /**
+   * Get the segment data directory, considering the segment tier if provided.
+   */
+  File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig);
+
   /**
    * Returns true if the segment was deleted in the last few minutes.
    */
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index f6eef81335..4632047989 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -173,6 +173,62 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  @Override
+  public void preloadSegment(ImmutableSegment segment) {
+    String segmentName = segment.getSegmentName();
+    if (_stopped) {
+      _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName);
+      return;
+    }
+    Preconditions.checkArgument(_enableSnapshot, "Snapshot must be enabled to preload segment: {}, table: {}",
+        segmentName, _tableNameWithType);
+    // Note that EmptyIndexSegment should not reach here either, as it doesn't have validDocIds snapshot.
+    Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+        "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+        _tableNameWithType);
+    _snapshotLock.readLock().lock();
+    startOperation();
+    try {
+      doPreloadSegment((ImmutableSegmentImpl) segment);
+      _trackedSegments.add(segment);
+    } finally {
+      finishOperation();
+      _snapshotLock.readLock().unlock();
+    }
+  }
+
+  private void doPreloadSegment(ImmutableSegmentImpl segment) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Preloading segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
+    long startTimeMs = System.currentTimeMillis();
+
+    MutableRoaringBitmap validDocIds = segment.loadValidDocIdsFromSnapshot();
+    Preconditions.checkState(validDocIds != null,
+        "Snapshot of validDocIds is required to preload segment: {}, table: {}", segmentName, _tableNameWithType);
+    if (validDocIds.isEmpty()) {
+      _logger.info("Skip preloading segment: {} without valid doc, current primary key count: {}",
+          segment.getSegmentName(), getNumPrimaryKeys());
+      segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
+      return;
+    }
+
+    try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+        _comparisonColumns, _deleteRecordColumn)) {
+      addSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while preloading segment: %s, table: %s", segmentName, _tableNameWithType),
+          e);
+    }
+
+    // Update metrics
+    long numPrimaryKeys = getNumPrimaryKeys();
+    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+        numPrimaryKeys);
+    _logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", segmentName,
+        System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+  }
+
   /**
    * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
    *       tests. The passed in bitmaps should always be empty.
@@ -180,6 +236,13 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   @VisibleForTesting
   public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+    addSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
+  }
+
+  @VisibleForTesting
+  public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+      boolean isPreloading) {
     String segmentName = segment.getSegmentName();
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
@@ -190,7 +253,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      if (isPreloading) {
+        addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
+      } else {
+        addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      }
     } finally {
       segmentLock.unlock();
     }
@@ -202,6 +269,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
 
+  protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+    addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+  }
+
   @Override
   public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
     if (_stopped) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 06e112b1e6..c6434ec487 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -18,22 +18,48 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 @ThreadSafe
 public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableUpsertMetadataManager.class);
+  private TableConfig _tableConfig;
+  private Schema _schema;
+  private TableDataManager _tableDataManager;
   protected String _tableNameWithType;
   protected List<String> _primaryKeyColumns;
   protected List<String> _comparisonColumns;
@@ -42,10 +68,14 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
   protected ServerMetrics _serverMetrics;
+  private volatile boolean _isPreloading = false;
 
   @Override
   public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager,
-      ServerMetrics serverMetrics) {
+      ServerMetrics serverMetrics, HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _tableDataManager = tableDataManager;
     _tableNameWithType = tableConfig.getTableName();
 
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
@@ -75,6 +105,125 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
+      // Preloading the segments with snapshots for fast upsert metadata recovery.
+      // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+      // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+      // The thread doing the segment preloading here must complete before the other helix threads start to handle
+      // segment state transitions. This is ensured implicitly because segment preloading happens here when
+      // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+      // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+      try {
+        _isPreloading = true;
+        preloadSegments(helixManager, segmentPreloadExecutor);
+      } catch (Exception e) {
+        // Even if preloading fails, we should continue to complete the initialization, so that TableDataManager can be
+        // created. Once TableDataManager is created, no more segment preloading would happen, and the normal segment
+        // loading logic would be used. The segments not being preloaded successfully here would be loaded via the
+        // normal segment loading logic, the one doing more costly checks on the upsert metadata.
+        LOGGER.warn("Failed to preload segments from table: {}, skipping", _tableNameWithType, e);
+        if (e instanceof InterruptedException) {
+          // Restore the interrupted status in case the upper callers want to check.
+          Thread.currentThread().interrupt();
+        }
+      } finally {
+        _isPreloading = false;
+      }
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully completes.
+   */
+  private void preloadSegments(HelixManager helixManager, ExecutorService segmentPreloadExecutor)
+      throws Exception {
+    LOGGER.info("Preload segments from table: {} for fast upsert metadata recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(helixManager, _tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+        continue;
+      }
+      futures.add(segmentPreloadExecutor.submit(() -> {
+        try {
+          preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+        } catch (Exception e) {
+          LOGGER.warn("Failed to preload segment: {} from table: {}, skipping", segmentName, _tableNameWithType, e);
+        }
+      }));
+    }
+    try {
+      for (Future<?> f : futures) {
+        f.get();
+      }
+    } finally {
+      for (Future<?> f : futures) {
+        if (!f.isDone()) {
+          f.cancel(true);
+        }
+      }
+    }
+    LOGGER.info("Preloaded segments from table: {} for fast upsert metadata recovery", _tableNameWithType);
+  }
+
+  private String getInstanceId() {
+    InstanceDataManagerConfig instanceDataManagerConfig =
+        _tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
+    return instanceDataManagerConfig.getInstanceId();
+  }
+
+  @VisibleForTesting
+  protected IndexLoadingConfig createIndexLoadingConfig() {
+    return new IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
+        _tableConfig, _schema);
+  }
+
+  private void preloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    LOGGER.info("Preload segment: {} from table: {}", segmentName, _tableNameWithType);
+    SegmentZKMetadata zkMetadata =
+        ZKMetadataProvider.getSegmentZKMetadata(propertyStore, _tableNameWithType, segmentName);
+    Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName,
+        _tableNameWithType);
+    File snapshotFile = getValidDocIdsSnapshotFile(segmentName, zkMetadata.getTier());
+    if (!snapshotFile.exists()) {
+      LOGGER.info("Skip segment: {} as no validDocIds snapshot at: {}", segmentName, snapshotFile);
+      return;
+    }
+    preloadSegmentWithSnapshot(segmentName, indexLoadingConfig, zkMetadata);
+    LOGGER.info("Preloaded segment: {} from table: {}", segmentName, _tableNameWithType);
+  }
+
+  @VisibleForTesting
+  protected void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata) {
+    // This method might modify the file on disk. Use segment lock to prevent race condition
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    try {
+      segmentLock.lock();
+      // This method checks segment crc and if it has changed, the segment is not loaded.
+      _tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata);
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
+  private File getValidDocIdsSnapshotFile(String segmentName, String segmentTier) {
+    File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _tableConfig);
+    return new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+  }
+
+  @Override
+  public boolean isPreloading() {
+    return _isPreloading;
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index a65ce670bd..cae8092b2f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -160,6 +160,20 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     }
   }
 
+  @Override
+  protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+    segment.enableUpsert(this, validDocIds, queryableDocIds);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      int newDocId = recordInfo.getDocId();
+      Comparable newComparisonValue = recordInfo.getComparisonValue();
+      addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+      _primaryKeyToRecordLocationMap.put(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
+          new RecordLocation(segment, newDocId, newComparisonValue));
+    }
+  }
+
   private static void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
     validDocIds.replace(oldDocId, newDocId);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 55cd8497cb..fbf864bd58 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -64,6 +64,13 @@ public interface PartitionUpsertMetadataManager extends Closeable {
    */
   void addSegment(ImmutableSegment segment);
 
+  /**
+   * Different from adding a segment, when preloading a segment, the upsert metadata may be updated more efficiently.
+   * Basically the upsert metadata can be directly updated for each primary key, without doing the more costly
+   * read-compare-update.
+   */
+  void preloadSegment(ImmutableSegment segment);
+
   /**
    * Updates the upsert metadata for a new consumed record in the given consuming segment.
    */
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 52007f16fe..b6ae51b265 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -19,7 +19,10 @@
 package org.apache.pinot.segment.local.upsert;
 
 import java.io.Closeable;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -32,8 +35,8 @@ import org.apache.pinot.spi.data.Schema;
  */
 @ThreadSafe
 public interface TableUpsertMetadataManager extends Closeable {
-
-  void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics);
+  void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics,
+      HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor);
 
   PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
 
@@ -43,4 +46,6 @@ public interface TableUpsertMetadataManager extends Closeable {
    * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
    */
   void stop();
+
+  boolean isPreloading();
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 307128ff63..3320c60a43 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -19,7 +19,10 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.base.Preconditions;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -36,7 +39,8 @@ public class TableUpsertMetadataManagerFactory {
   private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
 
   public static TableUpsertMetadataManager create(TableConfig tableConfig, Schema schema,
-      TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+      TableDataManager tableDataManager, ServerMetrics serverMetrics, HelixManager helixManager,
+      @Nullable ExecutorService segmentPreloadExecutor) {
     String tableNameWithType = tableConfig.getTableName();
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     Preconditions.checkArgument(upsertConfig != null, "Must provide upsert config for table: %s", tableNameWithType);
@@ -59,7 +63,7 @@ public class TableUpsertMetadataManagerFactory {
       metadataManager = new ConcurrentMapTableUpsertMetadataManager();
     }
 
-    metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics);
+    metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics, helixManager, segmentPreloadExecutor);
     return metadataManager;
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index a8d8e956c4..52f8590020 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -21,6 +21,8 @@ package org.apache.pinot.segment.local.indexsegment.mutable;
 import java.io.File;
 import java.net.URL;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
@@ -67,7 +69,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
         TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
-            mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+                mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
+            .getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch",
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index afaf2b566a..b4ed454c2d 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -23,6 +23,8 @@ import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
@@ -82,7 +84,8 @@ public class MutableSegmentImplUpsertTest {
     File jsonFile = new File(dataResourceUrl.getFile());
     _partitionUpsertMetadataManager =
         TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
-            mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+                mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
+            .getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, upsertConfigWithHash, "secondsSinceEpoch",
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 02fa90abdb..466f1885d5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -583,6 +583,58 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
+  @Test
+  public void testPreloadSegment() {
+    verifyPreloadSegment(HashFunction.NONE);
+    verifyPreloadSegment(HashFunction.MD5);
+    verifyPreloadSegment(HashFunction.MURMUR3);
+  }
+
+  private void verifyPreloadSegment(HashFunction hashFunction) {
+    String comparisonColumn = "timeCol";
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+    Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add the first segment
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] timestamps = new int[]{100, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
+    // Preloading segment adds the segment without checking for upsert.
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+
+    // Add the 2nd segment
+    // segment2: 0 -> {0, 1}, 1 -> {1, 2}
+    numRecords = 2;
+    primaryKeys = new int[]{0, 1};
+    timestamps = new int[]{1, 2};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment2, validDocIds2, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    // segment2: 0 -> {0, 1}, 1 -> {1, 2}
+    // segment2 was preloaded, so new locations got put in tracking map w/o checking on comparison values.
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 1, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment2, 1, 2, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+  }
+
   @Test
   public void testAddRecordWithDeleteColumn()
       throws IOException {
@@ -596,8 +648,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
-            false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false,
+            mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // queryableDocIds is same as validDocIds in the absence of delete markers
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
new file mode 100644
index 0000000000..ee1d6ffbd2
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class ConcurrentMapTableUpsertMetadataManagerTest {
+  private static final File TEMP_DIR =
+      new File(FileUtils.getTempDirectory(), "ConcurrentMapTableUpsertMetadataManagerTest");
+  private ExecutorService _segmentPreloadExecutor;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(TEMP_DIR);
+    _segmentPreloadExecutor = Executors.newFixedThreadPool(1);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    FileUtils.deleteQuietly(TEMP_DIR);
+    _segmentPreloadExecutor.shutdownNow();
+  }
+
+  @Test
+  public void testSkipPreloadSegments() {
+    TableConfig tableConfig = mock(TableConfig.class);
+    UpsertConfig upsertConfig = new UpsertConfig();
+    upsertConfig.setComparisonColumn("ts");
+    when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
+    Schema schema = mock(Schema.class);
+    when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
+
+    // Preloading is skipped as snapshot is not enabled.
+    ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager();
+    assertFalse(mgr.isPreloading());
+    mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
+        _segmentPreloadExecutor);
+    assertFalse(mgr.isPreloading());
+
+    // Preloading is skipped as preloading is not turned on.
+    upsertConfig.setEnableSnapshot(true);
+    mgr = new ConcurrentMapTableUpsertMetadataManager();
+    assertFalse(mgr.isPreloading());
+    mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
+        _segmentPreloadExecutor);
+    assertFalse(mgr.isPreloading());
+
+    upsertConfig.setEnablePreload(true);
+    mgr = new ConcurrentMapTableUpsertMetadataManager();
+    assertFalse(mgr.isPreloading());
+    // The preloading logic will hit on error as the HelixManager mock is not fully setup. But failure of preloading
+    // should not fail the init() method.
+    mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
+        _segmentPreloadExecutor);
+    assertFalse(mgr.isPreloading());
+  }
+
+  @Test
+  public void testPreloadOnlineSegments()
+      throws Exception {
+    Set<String> preloadedSegments = new HashSet<>();
+    AtomicBoolean wasPreloading = new AtomicBoolean(false);
+    ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager() {
+      @Override
+      protected IndexLoadingConfig createIndexLoadingConfig() {
+        return mock(IndexLoadingConfig.class);
+      }
+
+      @Override
+      protected void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig,
+          SegmentZKMetadata zkMetadata) {
+        wasPreloading.set(isPreloading());
+        preloadedSegments.add(segmentName);
+      }
+    };
+    // Setup mocks for TableConfig and Schema.
+    String tableNameWithType = "myTable_REALTIME";
+    TableConfig tableConfig = mock(TableConfig.class);
+    UpsertConfig upsertConfig = new UpsertConfig();
+    upsertConfig.setComparisonColumn("ts");
+    upsertConfig.setEnablePreload(true);
+    upsertConfig.setEnableSnapshot(true);
+    when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
+    when(tableConfig.getTableName()).thenReturn(tableNameWithType);
+    Schema schema = mock(Schema.class);
+    when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
+
+    // Setup mocks for HelixManager.
+    HelixManager helixManager = mock(HelixManager.class);
+    IdealState idealState = mock(IdealState.class);
+    HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
+    PropertyKey.Builder keyBuilder = mock(PropertyKey.Builder.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
+    PropertyKey propKey = mock(PropertyKey.class);
+    when(helixManager.getHelixDataAccessor()).thenReturn(dataAccessor);
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    when(dataAccessor.keyBuilder()).thenReturn(keyBuilder);
+    when(keyBuilder.idealStates(anyString())).thenReturn(propKey);
+    when(dataAccessor.getProperty(propKey)).thenReturn(idealState);
+
+    // Setup mocks to return the instanceId.
+    String instanceId = "server01";
+    TableDataManager tableDataManager = mock(TableDataManager.class);
+    TableDataManagerConfig tdmc = mock(TableDataManagerConfig.class);
+    InstanceDataManagerConfig idmc = mock(InstanceDataManagerConfig.class);
+    when(tableDataManager.getTableDataManagerConfig()).thenReturn(tdmc);
+    when(tdmc.getInstanceDataManagerConfig()).thenReturn(idmc);
+    when(idmc.getInstanceId()).thenReturn(instanceId);
+
+    // Only ONLINE segments are preloaded.
+    Map<String, Map<String, String>> segStates = new HashMap<>();
+    segStates.put("consuming_seg01", ImmutableMap.of(instanceId, "CONSUMING"));
+    segStates.put("consuming_seg02", ImmutableMap.of(instanceId, "CONSUMING"));
+    segStates.put("online_seg01", ImmutableMap.of(instanceId, "ONLINE"));
+    segStates.put("online_seg02", ImmutableMap.of(instanceId, "ONLINE"));
+    segStates.put("offline_seg01", ImmutableMap.of(instanceId, "OFFLINE"));
+    segStates.put("offline_seg02", ImmutableMap.of(instanceId, "OFFLINE"));
+    when(idealState.getPartitionSet()).thenReturn(segStates.keySet());
+    for (String segName : segStates.keySet()) {
+      when(idealState.getInstanceStateMap(segName)).thenReturn(segStates.get(segName));
+    }
+
+    // Setup mocks to get file path to validDocIds snapshot.
+    SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg01");
+    realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    when(propertyStore.get(
+        eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg01")), any(),
+        anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
+    realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg02");
+    realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    when(propertyStore.get(
+        eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg02")), any(),
+        anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
+
+    // No snapshot file for online_seg01, so it's skipped.
+    File seg01IdxDir = new File(TEMP_DIR, "online_seg01");
+    FileUtils.forceMkdir(seg01IdxDir);
+    when(tableDataManager.getSegmentDataDir("online_seg01", null, tableConfig)).thenReturn(seg01IdxDir);
+
+    File seg02IdxDir = new File(TEMP_DIR, "online_seg02");
+    FileUtils.forceMkdir(seg02IdxDir);
+    FileUtils.touch(new File(new File(seg02IdxDir, "v3"), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+    when(tableDataManager.getSegmentDataDir("online_seg02", null, tableConfig)).thenReturn(seg02IdxDir);
+
+    assertFalse(mgr.isPreloading());
+    mgr.init(tableConfig, schema, tableDataManager, mock(ServerMetrics.class), helixManager, _segmentPreloadExecutor);
+    assertEquals(preloadedSegments.size(), 1);
+    assertTrue(preloadedSegments.contains("online_seg02"));
+    assertTrue(wasPreloading.get());
+    assertFalse(mgr.isPreloading());
+  }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 92a82f133b..fea900201b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -99,6 +99,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   // Key is TableNameWithType-SegmentName pair.
   private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
   private ExecutorService _segmentRefreshExecutor;
+  private ExecutorService _segmentPreloadExecutor;
 
   @Override
   public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServingQueries) {
@@ -133,8 +134,20 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     SegmentBuildTimeLeaseExtender.initExecutor();
     // Initialize a fixed thread pool to reload/refresh segments in parallel. The getMaxParallelRefreshThreads() is
     // used to initialize a segment refresh semaphore to limit the parallelism, so create a pool of same size.
-    _segmentRefreshExecutor = Executors.newFixedThreadPool(getMaxParallelRefreshThreads(),
+    int poolSize = getMaxParallelRefreshThreads();
+    Preconditions.checkArgument(poolSize > 0,
+        "SegmentRefreshExecutor requires a positive pool size but got: " + poolSize);
+    _segmentRefreshExecutor = Executors.newFixedThreadPool(poolSize,
         new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
+    LOGGER.info("Created SegmentRefreshExecutor with pool size: {}", poolSize);
+    poolSize = _instanceDataManagerConfig.getMaxSegmentPreloadThreads();
+    if (poolSize > 0) {
+      _segmentPreloadExecutor = Executors.newFixedThreadPool(poolSize,
+          new ThreadFactoryBuilder().setNameFormat("segment-preload-thread-%d").build());
+      LOGGER.info("Created SegmentPreloadExecutor with pool size: {}", poolSize);
+    } else {
+      LOGGER.info("SegmentPreloadExecutor was not created with pool size: {}", poolSize);
+    }
     // Initialize the table data manager provider
     TableDataManagerProvider.init(_instanceDataManagerConfig);
     LOGGER.info("Initialized Helix instance data manager");
@@ -190,6 +203,9 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   @Override
   public synchronized void shutDown() {
     _segmentRefreshExecutor.shutdownNow();
+    if (_segmentPreloadExecutor != null) {
+      _segmentPreloadExecutor.shutdownNow();
+    }
     for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
       tableDataManager.shutDown();
     }
@@ -231,7 +247,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     TableDataManagerConfig tableDataManagerConfig = new TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, _instanceId, _propertyStore,
-            _serverMetrics, _helixManager, _errorCache, _isServerReadyToServeQueries);
+            _serverMetrics, _helixManager, _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 8f31fe1b0b..67a28dd448 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -121,6 +121,9 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
   //
   private static final String MAX_PARALLEL_REFRESH_THREADS = "max.parallel.refresh.threads";
 
+  // To preload segments of table using upsert in parallel for fast upsert metadata recovery.
+  private static final String MAX_SEGMENT_PRELOAD_THREADS = "max.segment.preload.threads";
+
   // Size of cache that holds errors.
   private static final String ERROR_CACHE_SIZE = "error.cache.size";
 
@@ -240,6 +243,10 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
     return _instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
   }
 
+  public int getMaxSegmentPreloadThreads() {
+    return _instanceDataManagerConfiguration.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
+  }
+
   public int getMaxParallelSegmentBuilds() {
     return _instanceDataManagerConfiguration
         .getProperty(MAX_PARALLEL_SEGMENT_BUILDS, DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 9fd8c2ca4d..2065471438 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -193,9 +193,8 @@ public abstract class BaseResourceTest {
     // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table because RealtimeTableDataManager requires
     //       table config.
     TableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager
-        .init(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
-            mock(HelixManager.class), null, new TableDataManagerParams(0, false, -1));
+    tableDataManager.init(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
+        mock(ServerMetrics.class), mock(HelixManager.class), null, null, new TableDataManagerParams(0, false, -1));
     tableDataManager.start();
     _tableDataManagerMap.put(tableNameWithType, tableDataManager);
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index c589b73a01..a9981de4ff 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -60,6 +60,9 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
   private boolean _enableSnapshot;
 
+  @JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery")
+  private boolean _enablePreload;
+
   @JsonPropertyDescription("Custom class for upsert metadata manager")
   private String _metadataManagerClass;
 
@@ -108,6 +111,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _enableSnapshot;
   }
 
+  public boolean isEnablePreload() {
+    return _enablePreload;
+  }
+
   @Nullable
   public String getMetadataManagerClass() {
     return _metadataManagerClass;
@@ -172,6 +179,10 @@ public class UpsertConfig extends BaseJsonConfig {
     _enableSnapshot = enableSnapshot;
   }
 
+  public void setEnablePreload(boolean enablePreload) {
+    _enablePreload = enablePreload;
+  }
+
   public void setMetadataManagerClass(String metadataManagerClass) {
     _metadataManagerClass = metadataManagerClass;
   }
diff --git a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index ecf92d5871..d7fa20d8fc 100644
--- a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -57,6 +57,8 @@
     "instanceSelectorType": "strictReplicaGroup"
   },
   "upsertConfig": {
-    "mode": "FULL"
+    "mode": "FULL",
+    "enableSnapshot": "true",
+    "enablePreload": "true"
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org