You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/11 07:52:57 UTC
[pinot] branch master updated: allow to preload segments with upsert snapshots to speedup table loading (#11020)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4bb7e6a7b0 allow to preload segments with upsert snapshots to speedup table loading (#11020)
4bb7e6a7b0 is described below
commit 4bb7e6a7b0bf9ee502b814819c7194d4974addc1
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Jul 11 00:52:51 2023 -0700
allow to preload segments with upsert snapshots to speedup table loading (#11020)
---
.../core/data/manager/BaseTableDataManager.java | 11 +-
.../manager/offline/TableDataManagerProvider.java | 11 +-
.../manager/realtime/RealtimeTableDataManager.java | 16 +-
.../BaseTableDataManagerAcquireSegmentTest.java | 2 +-
.../data/manager/BaseTableDataManagerTest.java | 6 +-
.../offline/DimensionTableDataManagerTest.java | 2 +-
.../realtime/RealtimeTableDataManagerTest.java | 4 +-
.../local/data/manager/TableDataManager.java | 21 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 74 +++++++-
.../upsert/BaseTableUpsertMetadataManager.java | 151 ++++++++++++++-
...oncurrentMapPartitionUpsertMetadataManager.java | 14 ++
.../upsert/PartitionUpsertMetadataManager.java | 7 +
.../local/upsert/TableUpsertMetadataManager.java | 9 +-
.../upsert/TableUpsertMetadataManagerFactory.java | 8 +-
.../MutableSegmentImplUpsertComparisonColTest.java | 5 +-
.../mutable/MutableSegmentImplUpsertTest.java | 5 +-
...rrentMapPartitionUpsertMetadataManagerTest.java | 56 +++++-
...oncurrentMapTableUpsertMetadataManagerTest.java | 211 +++++++++++++++++++++
.../starter/helix/HelixInstanceDataManager.java | 20 +-
.../helix/HelixInstanceDataManagerConfig.java | 7 +
.../apache/pinot/server/api/BaseResourceTest.java | 5 +-
.../pinot/spi/config/table/UpsertConfig.java | 11 ++
.../upsertMeetupRsvp_realtime_table_config.json | 4 +-
23 files changed, 626 insertions(+), 34 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 882af138d4..04059f94c1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -99,6 +100,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
protected File _resourceTmpDir;
protected Logger _logger;
protected HelixManager _helixManager;
+ protected ExecutorService _segmentPreloadExecutor;
protected AuthProvider _authProvider;
protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
protected boolean _isStreamSegmentDownloadUntar;
@@ -114,6 +116,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
@Override
public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
+ @Nullable ExecutorService segmentPreloadExecutor,
@Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
TableDataManagerParams tableDataManagerParams) {
LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName());
@@ -123,6 +126,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
_propertyStore = propertyStore;
_serverMetrics = serverMetrics;
_helixManager = helixManager;
+ _segmentPreloadExecutor = segmentPreloadExecutor;
_authProvider =
AuthProviderUtils.extractAuthProvider(toPinotConfiguration(_tableDataManagerConfig.getAuthConfig()), null);
@@ -681,8 +685,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
return new File(_indexDir, segmentName);
}
- @VisibleForTesting
- File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
+ @Override
+ public File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
if (segmentTier == null) {
return getSegmentDataDir(segmentName);
}
@@ -763,7 +767,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
* object may be created when trying to load the segment, but it's closed if the method
* returns false; otherwise it's opened and to be referred by ImmutableSegment object.
*/
- protected boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+ @Override
+ public boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
SegmentZKMetadata zkMetadata) {
// Try to recover the segment from potential segment reloading failure.
String segmentTier = zkMetadata.getTier();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 6b83d33dce..946ecf1565 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -20,8 +20,10 @@ package org.apache.pinot.core.data.manager.offline;
import com.google.common.cache.LoadingCache;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
@@ -60,13 +62,14 @@ public class TableDataManagerProvider {
public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
- return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager,
+ return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, null,
errorCache, () -> true);
}
public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
- LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) {
+ @Nullable ExecutorService segmentPreloadExecutor, LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
+ Supplier<Boolean> isServerReadyToServeQueries) {
TableDataManager tableDataManager;
switch (tableDataManagerConfig.getTableType()) {
case OFFLINE:
@@ -90,8 +93,8 @@ public class TableDataManagerProvider {
default:
throw new IllegalStateException();
}
- tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache,
- _tableDataManagerParams);
+ tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager,
+ segmentPreloadExecutor, errorCache, _tableDataManagerParams);
return tableDataManager;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 967cf48b2a..653d6bba3f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -125,6 +125,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
private TableDedupMetadataManager _tableDedupMetadataManager;
private TableUpsertMetadataManager _tableUpsertMetadataManager;
+ private boolean _isUpsertEnabled;
private BooleanSupplier _isTableReadyToConsumeData;
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
@@ -205,7 +206,12 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
_tableUpsertMetadataManager);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
- _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics);
+ // While creating _tableUpsertMetadataManager object, some methods want to check if upsert is enabled, so track
+ // this status with a boolean, instead of relying on if _tableUpsertMetadataManager is null or not.
+ _isUpsertEnabled = true;
+ _tableUpsertMetadataManager =
+ TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics, _helixManager,
+ _segmentPreloadExecutor);
}
// For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
@@ -352,7 +358,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
}
public boolean isUpsertEnabled() {
- return _tableUpsertMetadataManager != null;
+ return _isUpsertEnabled;
}
public boolean isPartialUpsertEnabled() {
@@ -533,7 +539,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = registerSegment(segmentName, newSegmentManager);
if (oldSegmentManager == null) {
- partitionUpsertMetadataManager.addSegment(immutableSegment);
+ if (_tableUpsertMetadataManager.isPreloading()) {
+ partitionUpsertMetadataManager.preloadSegment(immutableSegment);
+ } else {
+ partitionUpsertMetadataManager.addSegment(immutableSegment);
+ }
_logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
} else {
IndexSegment oldSegment = oldSegmentManager.getSegment();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index c679a6d2fe..9f1dfbbe0a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -130,7 +130,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
when(config.getTableDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
}
tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index b4cc98cbf1..7eb2ef0acd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -719,7 +719,7 @@ public class BaseTableDataManagerTest {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return tableDataManager;
@@ -728,7 +728,7 @@ public class BaseTableDataManagerTest {
private static BaseTableDataManager createTableManager(TableDataManagerConfig config, HelixManager helixManager) {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return tableDataManager;
@@ -737,7 +737,7 @@ public class BaseTableDataManagerTest {
private static OfflineTableDataManager createSpyOfflineTableManager(TableDataManagerConfig tableDataManagerConfig) {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(tableDataManagerConfig, "dummyInstance", mock(ZkHelixPropertyStore.class),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return Mockito.spy(tableDataManager);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 132b94e72a..3381d9c383 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -133,7 +133,7 @@ public class DimensionTableDataManagerTest {
when(config.getDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
}
tableDataManager.init(config, "dummyInstance", helixManager.getHelixPropertyStore(),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, null,
new TableDataManagerParams(0, false, -1));
tableDataManager.start();
return tableDataManager;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index 392e562fac..45196e2730 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -100,7 +100,7 @@ public class RealtimeTableDataManagerTest {
TableConfig tableConfig = setupTableConfig(propertyStore);
Schema schema = setupSchema(propertyStore);
tmgr.init(tableDataManagerConfig, "server01", propertyStore,
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
// Create a dummy local segment.
@@ -135,7 +135,7 @@ public class RealtimeTableDataManagerTest {
TableConfig tableConfig = setupTableConfig(propertyStore);
Schema schema = setupSchema(propertyStore);
tmgr.init(tableDataManagerConfig, "server01", propertyStore,
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, null,
new TableDataManagerParams(0, false, -1));
// Create a raw segment and put it in deep store backed by local fs.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index ae53ce3383..45965f4bc4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -22,6 +22,7 @@ import com.google.common.cache.LoadingCache;
import java.io.File;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.tuple.Pair;
@@ -34,6 +35,7 @@ import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -48,7 +50,9 @@ public interface TableDataManager {
*/
void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
- LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, TableDataManagerParams tableDataManagerParams);
+ @Nullable ExecutorService segmentPreloadExecutor,
+ @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
+ TableDataManagerParams tableDataManagerParams);
/**
* Starts the table data manager. Should be called only once after table data manager gets initialized but before
@@ -126,6 +130,21 @@ public interface TableDataManager {
*/
void removeSegment(String segmentName);
+ /**
+ * Try to load a segment from an existing segment directory managed by the server. The segment loading may fail
+ * because the directory may not exist any more, or the segment data has a different crc now, or the existing segment
+ * data got corrupted etc.
+ *
+ * @return true if the segment is loaded successfully from the existing segment directory; false otherwise.
+ */
+ boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+ SegmentZKMetadata zkMetadata);
+
+ /**
+ * Get the segment data directory, considering the segment tier if provided.
+ */
+ File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig);
+
/**
* Returns true if the segment was deleted in the last few minutes.
*/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index f6eef81335..4632047989 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -173,6 +173,62 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
+ @Override
+ public void preloadSegment(ImmutableSegment segment) {
+ String segmentName = segment.getSegmentName();
+ if (_stopped) {
+ _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName);
+ return;
+ }
+ Preconditions.checkArgument(_enableSnapshot, "Snapshot must be enabled to preload segment: {}, table: {}",
+ segmentName, _tableNameWithType);
+ // Note that EmptyIndexSegment should not reach here either, as it doesn't have validDocIds snapshot.
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+ _tableNameWithType);
+ _snapshotLock.readLock().lock();
+ startOperation();
+ try {
+ doPreloadSegment((ImmutableSegmentImpl) segment);
+ _trackedSegments.add(segment);
+ } finally {
+ finishOperation();
+ _snapshotLock.readLock().unlock();
+ }
+ }
+
+ private void doPreloadSegment(ImmutableSegmentImpl segment) {
+ String segmentName = segment.getSegmentName();
+ _logger.info("Preloading segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
+ long startTimeMs = System.currentTimeMillis();
+
+ MutableRoaringBitmap validDocIds = segment.loadValidDocIdsFromSnapshot();
+ Preconditions.checkState(validDocIds != null,
+ "Snapshot of validDocIds is required to preload segment: {}, table: {}", segmentName, _tableNameWithType);
+ if (validDocIds.isEmpty()) {
+ _logger.info("Skip preloading segment: {} without valid doc, current primary key count: {}",
+ segment.getSegmentName(), getNumPrimaryKeys());
+ segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
+ return;
+ }
+
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+ _comparisonColumns, _deleteRecordColumn)) {
+ addSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while preloading segment: %s, table: %s", segmentName, _tableNameWithType),
+ e);
+ }
+
+ // Update metrics
+ long numPrimaryKeys = getNumPrimaryKeys();
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+ numPrimaryKeys);
+ _logger.info("Finished preloading segment: {} in {}ms, current primary key count: {}", segmentName,
+ System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+ }
+
/**
* NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
* tests. The passed in bitmaps should always be empty.
@@ -180,6 +236,13 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@VisibleForTesting
public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+ addSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
+ }
+
+ @VisibleForTesting
+ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+ boolean isPreloading) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
@@ -190,7 +253,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
if (queryableDocIds == null && _deleteRecordColumn != null) {
queryableDocIds = new ThreadSafeMutableRoaringBitmap();
}
- addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+ if (isPreloading) {
+ addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
+ } else {
+ addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+ }
} finally {
segmentLock.unlock();
}
@@ -202,6 +269,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
@Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
+ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+ addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+ }
+
@Override
public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
if (_stopped) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 06e112b1e6..c6434ec487 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -18,22 +18,48 @@
*/
package org.apache.pinot.segment.local.upsert;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ThreadSafe
public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableUpsertMetadataManager.class);
+ private TableConfig _tableConfig;
+ private Schema _schema;
+ private TableDataManager _tableDataManager;
protected String _tableNameWithType;
protected List<String> _primaryKeyColumns;
protected List<String> _comparisonColumns;
@@ -42,10 +68,14 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected PartialUpsertHandler _partialUpsertHandler;
protected boolean _enableSnapshot;
protected ServerMetrics _serverMetrics;
+ private volatile boolean _isPreloading = false;
@Override
public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager,
- ServerMetrics serverMetrics) {
+ ServerMetrics serverMetrics, HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor) {
+ _tableConfig = tableConfig;
+ _schema = schema;
+ _tableDataManager = tableDataManager;
_tableNameWithType = tableConfig.getTableName();
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
@@ -75,6 +105,125 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
_enableSnapshot = upsertConfig.isEnableSnapshot();
_serverMetrics = serverMetrics;
+ if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
+ // Preloading the segments with snapshots for fast upsert metadata recovery.
+ // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+ // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+ // The thread doing the segment preloading here must complete before the other helix threads start to handle
+ // segment state transitions. This is ensured implicitly because segment preloading happens here when
+ // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+ // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+ try {
+ _isPreloading = true;
+ preloadSegments(helixManager, segmentPreloadExecutor);
+ } catch (Exception e) {
+ // Even if preloading fails, we should continue to complete the initialization, so that TableDataManager can be
+ // created. Once TableDataManager is created, no more segment preloading would happen, and the normal segment
+ // loading logic would be used. The segments not being preloaded successfully here would be loaded via the
+ // normal segment loading logic, the one doing more costly checks on the upsert metadata.
+ LOGGER.warn("Failed to preload segments from table: {}, skipping", _tableNameWithType, e);
+ if (e instanceof InterruptedException) {
+ // Restore the interrupted status in case the upper callers want to check.
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ _isPreloading = false;
+ }
+ }
+ }
+
+ /**
+ * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+ * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+ * transitions, which will proceed after the preloading phase fully completes.
+ */
+ private void preloadSegments(HelixManager helixManager, ExecutorService segmentPreloadExecutor)
+ throws Exception {
+ LOGGER.info("Preload segments from table: {} for fast upsert metadata recovery", _tableNameWithType);
+ IdealState idealState = HelixHelper.getTableIdealState(helixManager, _tableNameWithType);
+ ZkHelixPropertyStore<ZNRecord> propertyStore = helixManager.getHelixPropertyStore();
+ String instanceId = getInstanceId();
+ IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+ List<Future<?>> futures = new ArrayList<>();
+ for (String segmentName : idealState.getPartitionSet()) {
+ Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+ String state = instanceStateMap.get(instanceId);
+ if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+ LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+ continue;
+ }
+ futures.add(segmentPreloadExecutor.submit(() -> {
+ try {
+ preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to preload segment: {} from table: {}, skipping", segmentName, _tableNameWithType, e);
+ }
+ }));
+ }
+ try {
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ } finally {
+ for (Future<?> f : futures) {
+ if (!f.isDone()) {
+ f.cancel(true);
+ }
+ }
+ }
+ LOGGER.info("Preloaded segments from table: {} for fast upsert metadata recovery", _tableNameWithType);
+ }
+
+ private String getInstanceId() {
+ InstanceDataManagerConfig instanceDataManagerConfig =
+ _tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig();
+ return instanceDataManagerConfig.getInstanceId();
+ }
+
+ @VisibleForTesting
+ protected IndexLoadingConfig createIndexLoadingConfig() {
+ return new IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
+ _tableConfig, _schema);
+ }
+
+ private void preloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ LOGGER.info("Preload segment: {} from table: {}", segmentName, _tableNameWithType);
+ SegmentZKMetadata zkMetadata =
+ ZKMetadataProvider.getSegmentZKMetadata(propertyStore, _tableNameWithType, segmentName);
+ Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName,
+ _tableNameWithType);
+ File snapshotFile = getValidDocIdsSnapshotFile(segmentName, zkMetadata.getTier());
+ if (!snapshotFile.exists()) {
+ LOGGER.info("Skip segment: {} as no validDocIds snapshot at: {}", segmentName, snapshotFile);
+ return;
+ }
+ preloadSegmentWithSnapshot(segmentName, indexLoadingConfig, zkMetadata);
+ LOGGER.info("Preloaded segment: {} from table: {}", segmentName, _tableNameWithType);
+ }
+
+ @VisibleForTesting
+ protected void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig,
+ SegmentZKMetadata zkMetadata) {
+ // This method might modify the file on disk. Use segment lock to prevent race condition
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ try {
+ segmentLock.lock();
+ // This method checks segment crc and if it has changed, the segment is not loaded.
+ _tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata);
+ } finally {
+ segmentLock.unlock();
+ }
+ }
+
+ private File getValidDocIdsSnapshotFile(String segmentName, String segmentTier) {
+ File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _tableConfig);
+ return new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+ }
+
+ @Override
+ public boolean isPreloading() {
+ return _isPreloading;
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index a65ce670bd..cae8092b2f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -160,6 +160,20 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
}
}
+ @Override
+ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+ segment.enableUpsert(this, validDocIds, queryableDocIds);
+ while (recordInfoIterator.hasNext()) {
+ RecordInfo recordInfo = recordInfoIterator.next();
+ int newDocId = recordInfo.getDocId();
+ Comparable newComparisonValue = recordInfo.getComparisonValue();
+ addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+ _primaryKeyToRecordLocationMap.put(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
+ new RecordLocation(segment, newDocId, newComparisonValue));
+ }
+ }
+
private static void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
validDocIds.replace(oldDocId, newDocId);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 55cd8497cb..fbf864bd58 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -64,6 +64,13 @@ public interface PartitionUpsertMetadataManager extends Closeable {
*/
void addSegment(ImmutableSegment segment);
+ /**
+ * Different from adding a segment, when preloading a segment, the upsert metadata may be updated more efficiently.
+ * Basically the upsert metadata can be directly updated for each primary key, without doing the more costly
+ * read-compare-update.
+ */
+ void preloadSegment(ImmutableSegment segment);
+
/**
* Updates the upsert metadata for a new consumed record in the given consuming segment.
*/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 52007f16fe..b6ae51b265 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -19,7 +19,10 @@
package org.apache.pinot.segment.local.upsert;
import java.io.Closeable;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -32,8 +35,8 @@ import org.apache.pinot.spi.data.Schema;
*/
@ThreadSafe
public interface TableUpsertMetadataManager extends Closeable {
-
- void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics);
+ void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics,
+ HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor);
PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
@@ -43,4 +46,6 @@ public interface TableUpsertMetadataManager extends Closeable {
* Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
*/
void stop();
+
+ boolean isPreloading();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 307128ff63..3320c60a43 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -19,7 +19,10 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.base.Preconditions;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -36,7 +39,8 @@ public class TableUpsertMetadataManagerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
public static TableUpsertMetadataManager create(TableConfig tableConfig, Schema schema,
- TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+ TableDataManager tableDataManager, ServerMetrics serverMetrics, HelixManager helixManager,
+ @Nullable ExecutorService segmentPreloadExecutor) {
String tableNameWithType = tableConfig.getTableName();
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
Preconditions.checkArgument(upsertConfig != null, "Must provide upsert config for table: %s", tableNameWithType);
@@ -59,7 +63,7 @@ public class TableUpsertMetadataManagerFactory {
metadataManager = new ConcurrentMapTableUpsertMetadataManager();
}
- metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics);
+ metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics, helixManager, segmentPreloadExecutor);
return metadataManager;
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index a8d8e956c4..52f8590020 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -21,6 +21,8 @@ package org.apache.pinot.segment.local.indexsegment.mutable;
import java.io.File;
import java.net.URL;
import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
@@ -67,7 +69,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
- mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+ mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
+ .getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch",
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index afaf2b566a..b4ed454c2d 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -23,6 +23,8 @@ import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
@@ -82,7 +84,8 @@ public class MutableSegmentImplUpsertTest {
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
- mock(ServerMetrics.class)).getOrCreatePartitionManager(0);
+ mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
+ .getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, upsertConfigWithHash, "secondsSinceEpoch",
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 02fa90abdb..466f1885d5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -583,6 +583,58 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
+ @Test
+ public void testPreloadSegment() {
+ verifyPreloadSegment(HashFunction.NONE);
+ verifyPreloadSegment(HashFunction.MD5);
+ verifyPreloadSegment(HashFunction.MURMUR3);
+ }
+
+ private void verifyPreloadSegment(HashFunction hashFunction) {
+ String comparisonColumn = "timeCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+ Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add the first segment
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{0, 1, 2};
+ int[] timestamps = new int[]{100, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
+ // Preloading segment adds the segment without checking for upsert.
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+
+ // Add the 2nd segment
+ // segment2: 0 -> {0, 1}, 1 -> {1, 2}
+ numRecords = 2;
+ primaryKeys = new int[]{0, 1};
+ timestamps = new int[]{1, 2};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment2, validDocIds2, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ // segment2: 0 -> {0, 1}, 1 -> {1, 2}
+ // segment2 was preloaded, so new locations got put in tracking map w/o checking on comparison values.
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 1, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment2, 1, 2, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+ }
+
@Test
public void testAddRecordWithDeleteColumn()
throws IOException {
@@ -596,8 +648,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
- false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false,
+ mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// queryableDocIds is same as validDocIds in the absence of delete markers
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
new file mode 100644
index 0000000000..ee1d6ffbd2
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class ConcurrentMapTableUpsertMetadataManagerTest {
+ private static final File TEMP_DIR =
+ new File(FileUtils.getTempDirectory(), "ConcurrentMapTableUpsertMetadataManagerTest");
+ private ExecutorService _segmentPreloadExecutor;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ _segmentPreloadExecutor = Executors.newFixedThreadPool(1);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ _segmentPreloadExecutor.shutdownNow();
+ }
+
+ @Test
+ public void testSkipPreloadSegments() {
+ TableConfig tableConfig = mock(TableConfig.class);
+ UpsertConfig upsertConfig = new UpsertConfig();
+ upsertConfig.setComparisonColumn("ts");
+ when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
+ Schema schema = mock(Schema.class);
+ when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
+
+ // Preloading is skipped as snapshot is not enabled.
+ ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager();
+ assertFalse(mgr.isPreloading());
+ mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
+ _segmentPreloadExecutor);
+ assertFalse(mgr.isPreloading());
+
+ // Preloading is skipped as preloading is not turned on.
+ upsertConfig.setEnableSnapshot(true);
+ mgr = new ConcurrentMapTableUpsertMetadataManager();
+ assertFalse(mgr.isPreloading());
+ mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
+ _segmentPreloadExecutor);
+ assertFalse(mgr.isPreloading());
+
+ upsertConfig.setEnablePreload(true);
+ mgr = new ConcurrentMapTableUpsertMetadataManager();
+ assertFalse(mgr.isPreloading());
+ // The preloading logic will hit on error as the HelixManager mock is not fully setup. But failure of preloading
+ // should not fail the init() method.
+ mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
+ _segmentPreloadExecutor);
+ assertFalse(mgr.isPreloading());
+ }
+
+ @Test
+ public void testPreloadOnlineSegments()
+ throws Exception {
+ Set<String> preloadedSegments = new HashSet<>();
+ AtomicBoolean wasPreloading = new AtomicBoolean(false);
+ ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager() {
+ @Override
+ protected IndexLoadingConfig createIndexLoadingConfig() {
+ return mock(IndexLoadingConfig.class);
+ }
+
+ @Override
+ protected void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig,
+ SegmentZKMetadata zkMetadata) {
+ wasPreloading.set(isPreloading());
+ preloadedSegments.add(segmentName);
+ }
+ };
+ // Setup mocks for TableConfig and Schema.
+ String tableNameWithType = "myTable_REALTIME";
+ TableConfig tableConfig = mock(TableConfig.class);
+ UpsertConfig upsertConfig = new UpsertConfig();
+ upsertConfig.setComparisonColumn("ts");
+ upsertConfig.setEnablePreload(true);
+ upsertConfig.setEnableSnapshot(true);
+ when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
+ when(tableConfig.getTableName()).thenReturn(tableNameWithType);
+ Schema schema = mock(Schema.class);
+ when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
+
+ // Setup mocks for HelixManager.
+ HelixManager helixManager = mock(HelixManager.class);
+ IdealState idealState = mock(IdealState.class);
+ HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
+ PropertyKey.Builder keyBuilder = mock(PropertyKey.Builder.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
+ PropertyKey propKey = mock(PropertyKey.class);
+ when(helixManager.getHelixDataAccessor()).thenReturn(dataAccessor);
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ when(dataAccessor.keyBuilder()).thenReturn(keyBuilder);
+ when(keyBuilder.idealStates(anyString())).thenReturn(propKey);
+ when(dataAccessor.getProperty(propKey)).thenReturn(idealState);
+
+ // Setup mocks to return the instanceId.
+ String instanceId = "server01";
+ TableDataManager tableDataManager = mock(TableDataManager.class);
+ TableDataManagerConfig tdmc = mock(TableDataManagerConfig.class);
+ InstanceDataManagerConfig idmc = mock(InstanceDataManagerConfig.class);
+ when(tableDataManager.getTableDataManagerConfig()).thenReturn(tdmc);
+ when(tdmc.getInstanceDataManagerConfig()).thenReturn(idmc);
+ when(idmc.getInstanceId()).thenReturn(instanceId);
+
+ // Only ONLINE segments are preloaded.
+ Map<String, Map<String, String>> segStates = new HashMap<>();
+ segStates.put("consuming_seg01", ImmutableMap.of(instanceId, "CONSUMING"));
+ segStates.put("consuming_seg02", ImmutableMap.of(instanceId, "CONSUMING"));
+ segStates.put("online_seg01", ImmutableMap.of(instanceId, "ONLINE"));
+ segStates.put("online_seg02", ImmutableMap.of(instanceId, "ONLINE"));
+ segStates.put("offline_seg01", ImmutableMap.of(instanceId, "OFFLINE"));
+ segStates.put("offline_seg02", ImmutableMap.of(instanceId, "OFFLINE"));
+ when(idealState.getPartitionSet()).thenReturn(segStates.keySet());
+ for (String segName : segStates.keySet()) {
+ when(idealState.getInstanceStateMap(segName)).thenReturn(segStates.get(segName));
+ }
+
+ // Setup mocks to get file path to validDocIds snapshot.
+ SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg01");
+ realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ when(propertyStore.get(
+ eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg01")), any(),
+ anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
+ realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg02");
+ realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ when(propertyStore.get(
+ eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg02")), any(),
+ anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
+
+ // No snapshot file for online_seg01, so it's skipped.
+ File seg01IdxDir = new File(TEMP_DIR, "online_seg01");
+ FileUtils.forceMkdir(seg01IdxDir);
+ when(tableDataManager.getSegmentDataDir("online_seg01", null, tableConfig)).thenReturn(seg01IdxDir);
+
+ File seg02IdxDir = new File(TEMP_DIR, "online_seg02");
+ FileUtils.forceMkdir(seg02IdxDir);
+ FileUtils.touch(new File(new File(seg02IdxDir, "v3"), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+ when(tableDataManager.getSegmentDataDir("online_seg02", null, tableConfig)).thenReturn(seg02IdxDir);
+
+ assertFalse(mgr.isPreloading());
+ mgr.init(tableConfig, schema, tableDataManager, mock(ServerMetrics.class), helixManager, _segmentPreloadExecutor);
+ assertEquals(preloadedSegments.size(), 1);
+ assertTrue(preloadedSegments.contains("online_seg02"));
+ assertTrue(wasPreloading.get());
+ assertFalse(mgr.isPreloading());
+ }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 92a82f133b..fea900201b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -99,6 +99,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
// Key is TableNameWithType-SegmentName pair.
private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
private ExecutorService _segmentRefreshExecutor;
+ private ExecutorService _segmentPreloadExecutor;
@Override
public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServingQueries) {
@@ -133,8 +134,20 @@ public class HelixInstanceDataManager implements InstanceDataManager {
SegmentBuildTimeLeaseExtender.initExecutor();
// Initialize a fixed thread pool to reload/refresh segments in parallel. The getMaxParallelRefreshThreads() is
// used to initialize a segment refresh semaphore to limit the parallelism, so create a pool of same size.
- _segmentRefreshExecutor = Executors.newFixedThreadPool(getMaxParallelRefreshThreads(),
+ int poolSize = getMaxParallelRefreshThreads();
+ Preconditions.checkArgument(poolSize > 0,
+ "SegmentRefreshExecutor requires a positive pool size but got: " + poolSize);
+ _segmentRefreshExecutor = Executors.newFixedThreadPool(poolSize,
new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
+ LOGGER.info("Created SegmentRefreshExecutor with pool size: {}", poolSize);
+ poolSize = _instanceDataManagerConfig.getMaxSegmentPreloadThreads();
+ if (poolSize > 0) {
+ _segmentPreloadExecutor = Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder().setNameFormat("segment-preload-thread-%d").build());
+ LOGGER.info("Created SegmentPreloadExecutor with pool size: {}", poolSize);
+ } else {
+ LOGGER.info("SegmentPreloadExecutor was not created with pool size: {}", poolSize);
+ }
// Initialize the table data manager provider
TableDataManagerProvider.init(_instanceDataManagerConfig);
LOGGER.info("Initialized Helix instance data manager");
@@ -190,6 +203,9 @@ public class HelixInstanceDataManager implements InstanceDataManager {
@Override
public synchronized void shutDown() {
_segmentRefreshExecutor.shutdownNow();
+ if (_segmentPreloadExecutor != null) {
+ _segmentPreloadExecutor.shutdownNow();
+ }
for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
tableDataManager.shutDown();
}
@@ -231,7 +247,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
TableDataManagerConfig tableDataManagerConfig = new TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
TableDataManager tableDataManager =
TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, _instanceId, _propertyStore,
- _serverMetrics, _helixManager, _errorCache, _isServerReadyToServeQueries);
+ _serverMetrics, _helixManager, _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 8f31fe1b0b..67a28dd448 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -121,6 +121,9 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
//
private static final String MAX_PARALLEL_REFRESH_THREADS = "max.parallel.refresh.threads";
+ // To preload segments of table using upsert in parallel for fast upsert metadata recovery.
+ private static final String MAX_SEGMENT_PRELOAD_THREADS = "max.segment.preload.threads";
+
// Size of cache that holds errors.
private static final String ERROR_CACHE_SIZE = "error.cache.size";
@@ -240,6 +243,10 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
return _instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
}
+ public int getMaxSegmentPreloadThreads() {
+ return _instanceDataManagerConfiguration.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
+ }
+
public int getMaxParallelSegmentBuilds() {
return _instanceDataManagerConfiguration
.getProperty(MAX_PARALLEL_SEGMENT_BUILDS, DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 9fd8c2ca4d..2065471438 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -193,9 +193,8 @@ public abstract class BaseResourceTest {
// NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table because RealtimeTableDataManager requires
// table config.
TableDataManager tableDataManager = new OfflineTableDataManager();
- tableDataManager
- .init(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
- mock(HelixManager.class), null, new TableDataManagerParams(0, false, -1));
+ tableDataManager.init(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
+ mock(ServerMetrics.class), mock(HelixManager.class), null, null, new TableDataManagerParams(0, false, -1));
tableDataManager.start();
_tableDataManagerMap.put(tableNameWithType, tableDataManager);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index c589b73a01..a9981de4ff 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -60,6 +60,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
private boolean _enableSnapshot;
+ @JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery")
+ private boolean _enablePreload;
+
@JsonPropertyDescription("Custom class for upsert metadata manager")
private String _metadataManagerClass;
@@ -108,6 +111,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _enableSnapshot;
}
+ public boolean isEnablePreload() {
+ return _enablePreload;
+ }
+
@Nullable
public String getMetadataManagerClass() {
return _metadataManagerClass;
@@ -172,6 +179,10 @@ public class UpsertConfig extends BaseJsonConfig {
_enableSnapshot = enableSnapshot;
}
+ public void setEnablePreload(boolean enablePreload) {
+ _enablePreload = enablePreload;
+ }
+
public void setMetadataManagerClass(String metadataManagerClass) {
_metadataManagerClass = metadataManagerClass;
}
diff --git a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index ecf92d5871..d7fa20d8fc 100644
--- a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -57,6 +57,8 @@
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
- "mode": "FULL"
+ "mode": "FULL",
+ "enableSnapshot": "true",
+ "enablePreload": "true"
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org