You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "navina (via GitHub)" <gi...@apache.org> on 2023/07/06 06:21:19 UTC

[GitHub] [pinot] navina commented on a diff in pull request #11020: allow to preload segments with upsert snapshots to speedup table loading

navina commented on code in PR #11020:
URL: https://github.com/apache/pinot/pull/11020#discussion_r1253977267


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java:
##########
@@ -64,6 +64,11 @@ public interface PartitionUpsertMetadataManager extends Closeable {
    */
   void addSegment(ImmutableSegment segment);
 
+  /**
+   * Different from adding a segment, when preloading a segment, the upsert metadata may be updated more efficiently.

Review Comment:
   nit: make this more informative by saying "upsert metadata will be directly updated instead of read-compare-update for each primary key" 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -75,6 +108,113 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _serverMetrics = serverMetrics;
+    _helixManager = helixManager;
+    _segmentPreloadExecutor = segmentPreloadExecutor;
+    try {
+      if (_enableSnapshot && upsertConfig.isEnablePreload()) {
+        // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
+        // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
+        // The thread doing the segment preloading here must complete before the other helix threads start to handle
+        // segment state transitions. This is ensured implicitly because segment preloading happens here when
+        // initializing this TableUpsertMetadataManager, which happens when initializing the TableDataManager, which
+        // happens as the lambda of ConcurrentHashMap.computeIfAbsent() method, which ensures the waiting logic.
+        _isPreloading = true;
+        preloadSegments();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Got interrupted to preload segments for table: " + _tableNameWithType, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to preload segments for table: " + _tableNameWithType, e);
+    } finally {
+      _isPreloading = false;
+    }
+  }
+
+  /**
+   * Get the ideal state and find segments assigned to current instance, then preload those with validDocIds snapshot.
+   * Skip those without the snapshots and those whose crc has changed, as they will be handled by normal Helix state
+   * transitions, which will proceed after the preloading phase fully completes.
+   */
+  private void preloadSegments()
+      throws ExecutionException, InterruptedException {
+    LOGGER.info("Preload segments for table: {} fast upsert metadata recovery", _tableNameWithType);
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, _tableNameWithType);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+    String instanceId = getInstanceId();
+    IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
+    List<Future<?>> futures = new ArrayList<>();
+    for (String segmentName : idealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      String state = instanceStateMap.get(instanceId);
+      if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        LOGGER.info("Skip segment: {} as its ideal state: {} is not ONLINE", segmentName, state);
+        continue;
+      }
+      if (_segmentPreloadExecutor == null) {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+        continue;
+      }
+      futures.add(_segmentPreloadExecutor.submit(() -> {
+        preloadSegment(segmentName, indexLoadingConfig, propertyStore);
+      }));
+    }
+    for (Future<?> f : futures) {
+      f.get();

Review Comment:
   For my understanding: If any of the preloadSegment throws an exception, it gets handled in the `init` methods try-catch block where a runtime exception gets thrown. is that really necessary if a preload fails? 
   Can we not treat a failed preload as a segment that would eventually get loaded by Helix state transition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org