You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "klsince (via GitHub)" <gi...@apache.org> on 2023/06/30 22:07:20 UTC

[GitHub] [pinot] klsince opened a new pull request, #11020: allow to preload segments with upsert snapshots to speedup table loading

klsince opened a new pull request, #11020:
URL: https://github.com/apache/pinot/pull/11020

   `feature`
   
   This PR adds a feature to preload segments from table that uses the upsert snapshot feature. The segments with validDocIds snapshots can be preloaded in a more efficient manner to speed up the table loading (i.e. server restarts). 
   
   Basically, the primary keys from all valid docs from all segments with validDocIds snapshots are unique, so we can simply put their primary keys into the upsert metadata map, w/o the doing the costly checks for duplicate primary keys. Once preloading is done, the remaining segments can be loaded as usual, i.e. check for duplicates and update validDocIds in the existing segments. This feature adds a synchronization logic between helix threads and preloading threads for correctness.
   
   ## Release Note ##
   1. added a new instance config: `max.segment.preload.threads` to configure how many bg threads to preload segments. The thread pool is shared across tables.
   2. added a table config (upsert config): `enablePreload`, whether to enable preloading.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1250232760


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -190,7 +192,11 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      if (_tableUpsertMetadataManager.isPreloadingSegment(segmentName)) {

Review Comment:
   hmm.. after some thoughts, I could not figure out how to make the changes you proposed.
   
   Basically, if we call `preloadSegment(ImmutableSegment segment)` as part of handleUpsert(), which is part of `addSegment(ImmutableSegment segment))` of `RealtimeTableDataManager`. I think that's too late for the snapshot existence check, because the segment is already fully loaded as the `ImmutableSegment` object is already created. The logic in current PR can skip the segment if it doesn't have the snapshot file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1253977267


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java:
##########
@@ -64,6 +64,11 @@ public interface PartitionUpsertMetadataManager extends Closeable {
    */
   void addSegment(ImmutableSegment segment);
 
+  /**
+   * Different from adding a segment, when preloading a segment, the upsert metadata may be updated more efficiently.

Review Comment:
   nit: make this more informative by saying "upsert metadata will be directly updated instead of read-compare-update for each primary key" 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +108,113 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+        // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+        // The thread doing the segment preloading here must complete before the other helix threads start to handle
+        // segment state transitions. This is ensured implicitly because segment preloading happens here when
+        // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+        // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+        _isPreloading = true;
+        preloadSegments();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Got interrupted to preload segments for table: " + _tableNameWithType, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to preload segments for table: " + _tableNameWithType, e);
+    } finally {
+      _isPreloading = false;
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully completes.
+   */
+  private void preloadSegments()
+      throws ExecutionException, InterruptedException {
+    LOGGER.info("Preload segments for table: {} fast upsert metadata recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+        continue;
+      }
+      if (_segmentPreloadExecutor == null) {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+        continue;
+      }
+      futures.add(_segmentPreloadExecutor.submit(() -> {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+      }));
+    }
+    for (Future<?> f : futures) {
+      f.get();

Review Comment:
   For my understanding: If any of the preloadSegment throws an exception, it gets handled in the `init` methods try-catch block where a runtime exception gets thrown. is that really necessary if a preload fails? 
   Can we not treat a failed preload as a segment that would eventually get loaded by Helix state transition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1255271973


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -173,13 +173,82 @@ protected void doAddSegment(ImmutableSegmentImpl segment) {
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  @Override
+  public void preloadSegment(ImmutableSegment segment) {
+    String segmentName = segment.getSegmentName();
+    if (_stopped) {
+      _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName);
+      return;
+    }
+    if (!_enableSnapshot) {
+      _logger.info("Skip preloading segment: {} because snapshot is not enabled", segmentName);
+      return;
+    }
+    if (segment instanceof EmptyIndexSegment) {
+      _logger.info("Skip preloading empty segment: {}", segmentName);
+      return;
+    }

Review Comment:
   Similarly here, this should never be hit



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +107,127 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+      // Preloading the segments with snapshots for fast upsert metadata recovery.
+      // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+      // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+      // The thread doing the segment preloading here must complete before the other helix threads start to handle
+      // segment state transitions. This is ensured implicitly because segment preloading happens here when
+      // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+      // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+      try {
+        _isPreloading = true;
+        preloadSegments();
+      } catch (Exception e) {
+        // Even if preloading fails, we should continue to complete the initialization, so that TableDataManager can be

Review Comment:
   This can potentially cause unexpected behavior. Let's not swallow this global catch. If we know certain preload failure can be skipped, we can catch it in a per-segment level so that we can continue preloading the remaining segments



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +107,127 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+      // Preloading the segments with snapshots for fast upsert metadata recovery.
+      // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+      // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+      // The thread doing the segment preloading here must complete before the other helix threads start to handle
+      // segment state transitions. This is ensured implicitly because segment preloading happens here when
+      // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+      // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+      try {
+        _isPreloading = true;
+        preloadSegments();
+      } catch (Exception e) {
+        // Even if preloading fails, we should continue to complete the initialization, so that TableDataManager can be
+        // created. Once TableDataManager is created, no more segment preloading would happen, and the normal segment
+        // loading logic would be used. The segments not being preloaded successfully here would be loaded via the
+        // normal segment loading logic, the one doing more costly checks on the upsert metadata.
+        LOGGER.warn("Failed to preload segments for table: {}", _tableNameWithType, e);
+        if (e instanceof InterruptedException) {
+          // Restore the interrupted status in case the upper callers want to check.
+          Thread.currentThread().interrupt();
+        }
+      } finally {
+        _isPreloading = false;
+      }
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully completes.
+   */
+  private void preloadSegments()
+      throws Exception {
+    LOGGER.info("Preload segments in table: {} for fast upsert metadata recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+        continue;
+      }
+      if (_segmentPreloadExecutor == null) {

Review Comment:
   (optional) Should we consider not allowing preloading when the executor is not configured? This will make it single threaded can potentially cause performance degradation



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -173,13 +173,82 @@ protected void doAddSegment(ImmutableSegmentImpl segment) {
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  @Override
+  public void preloadSegment(ImmutableSegment segment) {
+    String segmentName = segment.getSegmentName();
+    if (_stopped) {
+      _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName);
+      return;
+    }
+    if (!_enableSnapshot) {
+      _logger.info("Skip preloading segment: {} because snapshot is not enabled", segmentName);
+      return;
+    }

Review Comment:
   We should throw exception here. If we are preloading the segment, snapshot must exist



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java:
##########
@@ -35,8 +32,7 @@ private TableUpsertMetadataManagerFactory() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
 
-  public static TableUpsertMetadataManager create(TableConfig tableConfig, Schema schema,
-      TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+  public static TableUpsertMetadataManager create(TableConfig tableConfig) {

Review Comment:
   Suggest not making this change. For plugin factories, `init()` is usually called within the factory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1248859078


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -190,7 +192,11 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      if (_tableUpsertMetadataManager.isPreloadingSegment(segmentName)) {

Review Comment:
   This waiting logic can be removed as discussed in another comment. So likely no changes on this class.
   
   as in this PR, the TableUpsertMetadataMgr does preload and also checks snapshot existence. I’ll think about if adding that preloadSegment() would simplify the code further. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1250183518


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -135,6 +136,11 @@ public synchronized void init(PinotConfiguration config, HelixManager helixManag
     // used to initialize a segment refresh semaphore to limit the parallelism, so create a pool of same size.
     _segmentRefreshExecutor = Executors.newFixedThreadPool(getMaxParallelRefreshThreads(),
         new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
+    LOGGER.info("Initialized segment refresh executor thread pool: {}", getMaxParallelRefreshThreads());
+    _segmentPreloadExecutor = Executors.newFixedThreadPool(_instanceDataManagerConfig.getMaxSegmentPreloadThreads(),

Review Comment:
   sgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1253575809


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -190,7 +192,11 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      if (_tableUpsertMetadataManager.isPreloadingSegment(segmentName)) {

Review Comment:
   updated. thanks for idea 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1253587772


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +115,145 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _preloadExecutor = preloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        preloadSegments();

Review Comment:
   In fact, the `createTableDataManager()` method pasted above creates the TableDataManager object first, and then calls its init() method. The preloading logic happens as part of the init() method. So among the Helix threads about to handle segment state transitions, only the first Helix thread would proceed and preload segments, and the other Helix threads would wait for it to finish, as ensured by ConcurrentHashMap.computeIfAbsent(... lambda), which runs the lambda atomically



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1250189715


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java:
##########
@@ -43,4 +46,9 @@ public interface TableUpsertMetadataManager extends Closeable {
    * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
    */
   void stop();
+
+  void waitTillReady()

Review Comment:
   delete, as no need for this method anymore as discussed below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1251129536


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +115,145 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _preloadExecutor = preloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        preloadSegments();

Review Comment:
   I meant was we will need to create table data manager before we even received the first helix message right?
   Otherwise the preloading will trigger at the exact same time as we start loading the segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1256227999


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +107,127 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+      // Preloading the segments with snapshots for fast upsert metadata recovery.
+      // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+      // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+      // The thread doing the segment preloading here must complete before the other helix threads start to handle
+      // segment state transitions. This is ensured implicitly because segment preloading happens here when
+      // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+      // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+      try {
+        _isPreloading = true;
+        preloadSegments();
+      } catch (Exception e) {
+        // Even if preloading fails, we should continue to complete the initialization, so that TableDataManager can be

Review Comment:
   good point for finer handling, will add. 
   
   but at a higher level, we'd better not fail the init() method due to uncaught preloading failure, so that we can create the tableDataMgr and fall back to the normal loading flow. If init() keeps failing due to uncaught preloading failure, creation of tableDataMgr would be retried, leading to another round of preloading while calling the init() method again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1248859798


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -190,7 +192,11 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      if (_tableUpsertMetadataManager.isPreloadingSegment(segmentName)) {

Review Comment:
   Currently the snapshot existence check is done in table upsert mgr’s preloadSegments method. But I’ll think more about the idea proposed here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1250232760


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -190,7 +192,11 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      if (_tableUpsertMetadataManager.isPreloadingSegment(segmentName)) {

Review Comment:
   hmm.. after some thoughts, I could not figure out how to make the changes you proposed.
   
   Basically, if we call `preloadSegment(ImmutableSegment segment)` as part of handleUpsert(), which is part of addSegment(ImmutableSegment segment))` of `RealtimeTableDataManager`. I think that's too late for the snapshot existence check, because the segment is already fully loaded as the `ImmutableSegment` object is already created. The logic in current PR can skip the segment if it doesn't have the snapshot file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1256160640


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java:
##########
@@ -35,8 +32,7 @@ private TableUpsertMetadataManagerFactory() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
 
-  public static TableUpsertMetadataManager create(TableConfig tableConfig, Schema schema,
-      TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+  public static TableUpsertMetadataManager create(TableConfig tableConfig) {

Review Comment:
   kk. I made this change because isUpsertEnabled() is called in addSegment() to handle upsert specific states. So need to set `_tableUpsertMetadataManager` before preloading segments which happens as part of init() method. lemme use a boolean var for here.
   ```
   public boolean isUpsertEnabled() {
       return _tableUpsertMetadataManager != null;
     }
   
   public void addSegment(ImmutableSegment immutableSegment) {
       if (isUpsertEnabled()) {
         handleUpsert(immutableSegment);
         return;
       }
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1254767345


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +108,113 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+        // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+        // The thread doing the segment preloading here must complete before the other helix threads start to handle
+        // segment state transitions. This is ensured implicitly because segment preloading happens here when
+        // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+        // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+        _isPreloading = true;
+        preloadSegments();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Got interrupted to preload segments for table: " + _tableNameWithType, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to preload segments for table: " + _tableNameWithType, e);
+    } finally {
+      _isPreloading = false;
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully completes.
+   */
+  private void preloadSegments()
+      throws ExecutionException, InterruptedException {
+    LOGGER.info("Preload segments for table: {} fast upsert metadata recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+        continue;
+      }
+      if (_segmentPreloadExecutor == null) {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+        continue;
+      }
+      futures.add(_segmentPreloadExecutor.submit(() -> {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+      }));
+    }
+    for (Future<?> f : futures) {
+      f.get();

Review Comment:
   yes. As you explained, we can gracefully fallback to normal segment loading , if the preloading of a segment fails for whatever reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #11020:
URL: https://github.com/apache/pinot/pull/11020


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1248866166


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +115,145 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _preloadExecutor = preloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        preloadSegments();

Review Comment:
   Also the table data manager is created when first message is received in the Helix queue iiuc. We might need to change that as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1250228382


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +115,145 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _preloadExecutor = preloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        preloadSegments();

Review Comment:
   PR updated. The explicit waiting logic with lock/condition was removed, replaced with the implicit waiting logic as ensured by ConcurrentHashMap.computeIfAbsent() in HelixInstanceDataManager. I left a comment about this implicit behavior.
   
   hey @KKcorps 
   > Also the table data manager is created when first message is received in the Helix queue iiuc.
   Yes. This is the existing implementation of HelixInstanceDataManager, e.g. 
   ```
   public void addRealtimeSegment(String realtimeTableName, String segmentName)
   ...
   _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> createTableDataManager(k, tableConfig))
   ...
   ```
   
   > We might need to change that as well.  
   Pardon me, I didn't get where to change. Could you help elaborate a bit more on this. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1256227999


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +107,127 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+      // Preloading the segments with snapshots for fast upsert metadata recovery.
+      // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+      // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+      // The thread doing the segment preloading here must complete before the other helix threads start to handle
+      // segment state transitions. This is ensured implicitly because segment preloading happens here when
+      // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+      // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+      try {
+        _isPreloading = true;
+        preloadSegments();
+      } catch (Exception e) {
+        // Even if preloading fails, we should continue to complete the initialization, so that TableDataManager can be

Review Comment:
   good point for finer handling, will update. 
   
   but at a higher level, we'd better not fail the init() method due to preloading failure, so that we can create the tableDataMgr and fall back to the normal loading flow. If init() keeps failing due to uncaught preloading failure, creation of tableDataMgr would be retried, leading to another round of preloading while calling the init() method again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1254713434


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +108,113 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+        // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+        // The thread doing the segment preloading here must complete before the other helix threads start to handle
+        // segment state transitions. This is ensured implicitly because segment preloading happens here when
+        // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+        // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+        _isPreloading = true;
+        preloadSegments();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Got interrupted to preload segments for table: " + _tableNameWithType, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to preload segments for table: " + _tableNameWithType, e);
+    } finally {
+      _isPreloading = false;
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully completes.
+   */
+  private void preloadSegments()
+      throws ExecutionException, InterruptedException {
+    LOGGER.info("Preload segments for table: {} fast upsert metadata recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+        continue;
+      }
+      if (_segmentPreloadExecutor == null) {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+        continue;
+      }
+      futures.add(_segmentPreloadExecutor.submit(() -> {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+      }));
+    }
+    for (Future<?> f : futures) {
+      f.get();

Review Comment:
   Good point. I thought about a similar question about preloading failure, but mainly from the point that the segments failed to be preloaded here could be loaded by other helix threads via the normal segment loading flow later no. But I found an issue while thinking about your question again:
   
   Basically, because tableDataMgr could not be created due to init() failure, the next segment state transition would try to create the tableDataMgr again, which could kick off another round of segments preloading, if that failed again, we'd end up into a error loop until the preloading could complete. The feature flag could disable preloading if this error loop ever happened, but I think this just points out that we'd better not fail the init() due to preloading failure. Then the tableDataMgr could be created and put it into ConcurrentHashmap, so that no more preloading would happen, 'gracefully' falling back to normal (yet costly) segment loading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1254713434


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +108,113 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+        // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+        // The thread doing the segment preloading here must complete before the other helix threads start to handle
+        // segment state transitions. This is ensured implicitly because segment preloading happens here when
+        // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+        // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+        _isPreloading = true;
+        preloadSegments();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Got interrupted to preload segments for table: " + _tableNameWithType, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to preload segments for table: " + _tableNameWithType, e);
+    } finally {
+      _isPreloading = false;
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully completes.
+   */
+  private void preloadSegments()
+      throws ExecutionException, InterruptedException {
+    LOGGER.info("Preload segments for table: {} fast upsert metadata recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+        continue;
+      }
+      if (_segmentPreloadExecutor == null) {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+        continue;
+      }
+      futures.add(_segmentPreloadExecutor.submit(() -> {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+      }));
+    }
+    for (Future<?> f : futures) {
+      f.get();

Review Comment:
   Good point. I thought about a similar question about preloading failure, but mainly from the point that the segments failed to be preloaded here could be loaded by other helix threads via the normal segment loading flow later no. But I found an issue while thinking about your question again:
   
   Basically, because tableDataMgr could not be created due to init() failure, the next segment state transition would try to create the tableDataMgr again, which could kick off another round of segments preloading, if that failed again, then in worst case we'd end up into an error loop until the preloading could complete. The feature flag could disable preloading if this error loop ever happened, but I think this just points out that we'd better not fail the init() due to preloading failure. Then the tableDataMgr could be created and put it into ConcurrentHashmap, so that no more preloading would happen, 'gracefully' falling back to normal (yet costly) segment loading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1248858373


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -42,10 +73,19 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
   protected ServerMetrics _serverMetrics;
+  private HelixManager _helixManager;
+  private ExecutorService _preloadExecutor;
+  private volatile boolean _isReady = false;
+  private final Lock _isReadyLock = new ReentrantLock();
+  private final Condition _isReadyCon = _isReadyLock.newCondition();
+  private final Set<String> _preloadingSegmentsSet = ConcurrentHashMap.newKeySet();

Review Comment:
   This set is not necessary as can check isReady flag, via isReady() method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1248858243


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +115,145 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _preloadExecutor = preloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        preloadSegments();

Review Comment:
   Good point! The concurrentMap.computeIfAbsent() used to create table data mgr conditionally in InstanceDataMgr is functioning as the lock. This actually reminded me of the logs I observed in my tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11020:
URL: https://github.com/apache/pinot/pull/11020#issuecomment-1615258595

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11020](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b41bf77) into [master](https://app.codecov.io/gh/apache/pinot/commit/0b097a8dab7d2953d331010c856420967fe873e1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (0b097a8) will **increase** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #11020     +/-   ##
   =========================================
     Coverage    0.11%    0.11%             
   =========================================
     Files        2192     2138     -54     
     Lines      118016   115618   -2398     
     Branches    17869    17577    -292     
   =========================================
     Hits          137      137             
   + Misses     117859   115461   -2398     
     Partials       20       20             
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1temurin11 | `?` | |
   | integration1temurin17 | `?` | |
   | integration1temurin20 | `?` | |
   | integration2temurin11 | `?` | |
   | integration2temurin17 | `?` | |
   | unittests1temurin17 | `?` | |
   | unittests1temurin20 | `?` | |
   | unittests2temurin11 | `?` | |
   | unittests2temurin17 | `?` | |
   | unittests2temurin20 | `0.11% <0.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [.../pinot/core/data/manager/BaseTableDataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvQmFzZVRhYmxlRGF0YU1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...data/manager/offline/TableDataManagerProvider.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvb2ZmbGluZS9UYWJsZURhdGFNYW5hZ2VyUHJvdmlkZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ata/manager/realtime/RealtimeTableDataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUmVhbHRpbWVUYWJsZURhdGFNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...t/segment/local/data/manager/TableDataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9kYXRhL21hbmFnZXIvVGFibGVEYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...cal/upsert/BasePartitionUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQmFzZVBhcnRpdGlvblVwc2VydE1ldGFkYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...t/local/upsert/BaseTableUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQmFzZVRhYmxlVXBzZXJ0TWV0YWRhdGFNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...t/ConcurrentMapPartitionUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQ29uY3VycmVudE1hcFBhcnRpdGlvblVwc2VydE1ldGFkYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...psert/ConcurrentMapTableUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQ29uY3VycmVudE1hcFRhYmxlVXBzZXJ0TWV0YWRhdGFNYW5hZ2VyLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...ocal/upsert/TableUpsertMetadataManagerFactory.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvVGFibGVVcHNlcnRNZXRhZGF0YU1hbmFnZXJGYWN0b3J5LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...rg/apache/pinot/spi/config/table/UpsertConfig.java](https://app.codecov.io/gh/apache/pinot/pull/11020?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL1Vwc2VydENvbmZpZy5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ... and [56 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11020/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1248613256


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -135,6 +136,11 @@ public synchronized void init(PinotConfiguration config, HelixManager helixManag
     // used to initialize a segment refresh semaphore to limit the parallelism, so create a pool of same size.
     _segmentRefreshExecutor = Executors.newFixedThreadPool(getMaxParallelRefreshThreads(),
         new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
+    LOGGER.info("Initialized segment refresh executor thread pool: {}", getMaxParallelRefreshThreads());

Review Comment:
   (minor) Put `getMaxParallelRefreshThreads()` and `getMaxSegmentPreloadThreads()` into local variable



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -135,6 +136,11 @@ public synchronized void init(PinotConfiguration config, HelixManager helixManag
     // used to initialize a segment refresh semaphore to limit the parallelism, so create a pool of same size.
     _segmentRefreshExecutor = Executors.newFixedThreadPool(getMaxParallelRefreshThreads(),
         new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
+    LOGGER.info("Initialized segment refresh executor thread pool: {}", getMaxParallelRefreshThreads());
+    _segmentPreloadExecutor = Executors.newFixedThreadPool(_instanceDataManagerConfig.getMaxSegmentPreloadThreads(),

Review Comment:
   Do we always want to create this executor? Or by default not creating it (0 preload thread)? Currently the table data manager can take nullable executor



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -99,6 +100,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
   protected File _resourceTmpDir;
   protected Logger _logger;
   protected HelixManager _helixManager;
+  protected ExecutorService _preloadExecutor;

Review Comment:
   (nit) `_segmentPreloadExecutor` to be more specific



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java:
##########
@@ -126,6 +130,15 @@ void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConf
    */
   void removeSegment(String segmentName);
 
+  default boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata) {
+    return false;
+  }
+
+  default File getSegmentDataDir(String segmentName, @Nullable String segmentTier, TableConfig tableConfig) {
+    return null;
+  }

Review Comment:
   (minor) I don't think we need to provide default implementation for them. This interface is internal, and base class has them implemented



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -190,7 +192,11 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+      if (_tableUpsertMetadataManager.isPreloadingSegment(segmentName)) {

Review Comment:
   I feel it is cleaner if we add `preloadSegment(ImmutableSegment segment)` into `PartitionUpsertMetadataManager` and directly call it in `RealtimeTableDataManager.handleUpsert()`. This way we don't need to pass `TableUpsertMetadataManager` into `PartitionUpsertMetadataManager`, and during preload, we can perform some extra checks such as snapshot exist etc. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java:
##########
@@ -43,4 +46,9 @@ public interface TableUpsertMetadataManager extends Closeable {
    * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
    */
   void stop();
+
+  void waitTillReady()

Review Comment:
   Add some javadoc for the new added method



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -42,10 +73,19 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
   protected ServerMetrics _serverMetrics;
+  private HelixManager _helixManager;
+  private ExecutorService _preloadExecutor;
+  private volatile boolean _isReady = false;
+  private final Lock _isReadyLock = new ReentrantLock();
+  private final Condition _isReadyCon = _isReadyLock.newCondition();
+  private final Set<String> _preloadingSegmentsSet = ConcurrentHashMap.newKeySet();

Review Comment:
   Do we need to track this set? Before the manager returns preload finishes, all the add segment is preload



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +115,145 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _preloadExecutor = preloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        preloadSegments();

Review Comment:
   (MAJOR) Currently we are blocking the `init()` to preload all the segments, and this will prevent table data manager from being created. I feel it might work, but it is not intended. With the current code, we don't even need the lock because before preloading is done, table data manager won't be created.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java:
##########
@@ -43,4 +46,9 @@ public interface TableUpsertMetadataManager extends Closeable {
    * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
    */
   void stop();
+
+  void waitTillReady()

Review Comment:
   Do we want to make it more specific? e.g. `waitTillReadyToAddData` or `waitTillPreloadIsDone`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org