You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "itschrispeck (via GitHub)" <gi...@apache.org> on 2023/11/17 01:03:29 UTC

[PR] make deep store upload retry async with configurable parallelism [pinot]

itschrispeck opened a new pull request, #12017:
URL: https://github.com/apache/pinot/pull/12017

   Relates to: https://github.com/apache/pinot/issues/11874
   
   This PR moves the deep store upload retry to be an asynchronous operation. By default, the thread pool that performs the uploads is limited to a single thread which is evicted when unused. 
   
   A new config is introduced to allow for increasing the maximum number of threads that can be used for retrying segment uploads: `controller.realtime.segment.deepStoreUploadRetry.parallelism`
   
   A new metric is emitted to track the upload retry queue size, which can be used to ensure we can have visibility into scenarios where segment upload retry cannot keep up with new segments that are missing their deepstore copy: `LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE`
   
   Tested via cluster deployment, edited ZK to remove segment urls to trigger uploads and verified segments were uploaded. 
   
   tags: `enhancement`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] make deep store upload retry async with configurable parallelism [pinot]

Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat merged PR #12017:
URL: https://github.com/apache/pinot/pull/12017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] make deep store upload retry async with configurable parallelism [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1408522908


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
   /**
    * Test cases for fixing LLC segment by uploading to segment store if missing
    */
-  @Test
+  @Test(timeOut = 30_000L)

Review Comment:
   ```suggestion
     @Test
   ```



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
   /**
    * Test cases for fixing LLC segment by uploading to segment store if missing
    */
-  @Test
+  @Test(timeOut = 30_000L)
   public void testUploadToSegmentStore()
-      throws HttpErrorStatusException, IOException, URISyntaxException {
+      throws HttpErrorStatusException, IOException, URISyntaxException, InterruptedException {

Review Comment:
   ```suggestion
         throws HttpErrorStatusException, IOException, URISyntaxException {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
           continue;
         }
         // Skip the fix for the segment if it is already out of retention.
-        if (retentionStrategy != null && retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
-          LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", segmentName);
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention",
+              segmentName);
           continue;
         }
-        LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
-
-        // Find servers which have online replica
-        List<URI> peerSegmentURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
-        if (peerSegmentURIs.isEmpty()) {
-          throw new IllegalStateException(
-              String.format("Failed to upload segment %s to deep store because no online replica is found",
-                  segmentName));
-        }
-
-        // Randomly ask one server to upload
-        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
-        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
-        serverUploadRequestUrl =
-            String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
-        LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
-            serverUploadRequestUrl);
-        String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
-        String segmentDownloadUrl =
-            moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
-        LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
-        // Update segment ZK metadata by adding the download URL
-        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
-        // TODO: add version check when persist segment ZK metadata
-        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
-        LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
-            segmentDownloadUrl);
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
       } catch (Exception e) {
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
-        LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);
+        LOGGER.warn("Failed checking segment deep store URL for segment {}", segmentName);
+      }
+
+      // Skip the fix if an upload is already queued for this segment
+      if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+        continue;
       }
+      _deepStoreUploadExecutorPendingSegments.add(segmentName);

Review Comment:
   Should we also update the queue size here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
           continue;
         }
         // Skip the fix for the segment if it is already out of retention.
-        if (retentionStrategy != null && retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
-          LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", segmentName);
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention",
+              segmentName);
           continue;
         }
-        LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
-
-        // Find servers which have online replica
-        List<URI> peerSegmentURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
-        if (peerSegmentURIs.isEmpty()) {
-          throw new IllegalStateException(
-              String.format("Failed to upload segment %s to deep store because no online replica is found",
-                  segmentName));
-        }
-
-        // Randomly ask one server to upload
-        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
-        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
-        serverUploadRequestUrl =
-            String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
-        LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
-            serverUploadRequestUrl);
-        String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
-        String segmentDownloadUrl =
-            moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
-        LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
-        // Update segment ZK metadata by adding the download URL
-        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
-        // TODO: add version check when persist segment ZK metadata
-        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
-        LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
-            segmentDownloadUrl);
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
       } catch (Exception e) {
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
-        LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);
+        LOGGER.warn("Failed checking segment deep store URL for segment {}", segmentName);
+      }
+
+      // Skip the fix if an upload is already queued for this segment
+      if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+        continue;
       }
+      _deepStoreUploadExecutorPendingSegments.add(segmentName);

Review Comment:
   (minor)
   ```suggestion
         // Skip the fix if an upload is already queued for this segment
         if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
           continue;
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] make deep store upload retry async with configurable parallelism [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1398260187


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1035,6 +1036,13 @@ public void testUploadToSegmentStore()
 
     // Verify the result
     segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);
+
+    // Block until all tasks have been able to complete
+    ThreadPoolExecutor deepStoreUploadExecutor = segmentManager.getDeepStoreUploadExecutor();
+    while (deepStoreUploadExecutor.getActiveCount() > 0) {

Review Comment:
   Seems active count might not always be accurate (e.g. task not scheduled yet). Shall we wait on downloadUrl being set?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -194,6 +200,9 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
     _isTmpSegmentAsyncDeletionEnabled = controllerConf.isTmpSegmentAsyncDeletionEnabled();
     _deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs();
     _fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? initFileUploadDownloadClient() : null;
+    _deepStoreUploadExecutor = _isDeepStoreLLCSegmentUploadRetryEnabled ? initDeepStoreUploadExecutorService(
+        controllerConf.getDeepStoreRetryUploadParallelism()) : null;
+    _deepStoreUploadExecutorPendingSegments = _isDeepStoreLLCSegmentUploadRetryEnabled ? new HashSet<>() : null;

Review Comment:
   We need a thread safe set



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +915,9 @@ public void testCommitSegmentMetadata() {
   /**
    * Test cases for fixing LLC segment by uploading to segment store if missing
    */
-  @Test
+  @Test(timeOut = 30_000L)

Review Comment:
   Do not use timeout. We usually use `TestUtils.waitForCondition()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] make deep store upload retry async with configurable parallelism [pinot]

Posted by "itschrispeck (via GitHub)" <gi...@apache.org>.
itschrispeck commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1402706477


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1035,6 +1036,13 @@ public void testUploadToSegmentStore()
 
     // Verify the result
     segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);
+
+    // Block until all tasks have been able to complete
+    ThreadPoolExecutor deepStoreUploadExecutor = segmentManager.getDeepStoreUploadExecutor();
+    while (deepStoreUploadExecutor.getActiveCount() > 0) {

Review Comment:
   Changed to use pending segment set size, as waiting on downloadUrl makes it difficult to correctly test the error cases where we assert downloadUrl isn't set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] make deep store upload retry async with configurable parallelism [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12017:
URL: https://github.com/apache/pinot/pull/12017#issuecomment-1815610673

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: `2 lines` in your changes are missing coverage. Please review.
   > Comparison is base [(`2beb9a4`)](https://app.codecov.io/gh/apache/pinot/commit/2beb9a4938d7d7c9481fd2546075e2a2475fe0ec?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.61% compared to head [(`3bf17df`)](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 46.77%.
   > Report is 19 commits behind head on master.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...g/apache/pinot/common/metrics/ControllerGauge.java](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyR2F1Z2UuamF2YQ==) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12017       +/-   ##
   =============================================
   - Coverage     61.61%   46.77%   -14.84%     
   - Complexity      207      944      +737     
   =============================================
     Files          2385     1787      -598     
     Lines        129214    93696    -35518     
     Branches      20003    15158     -4845     
   =============================================
   - Hits          79613    43826    -35787     
   - Misses        43801    46750     +2949     
   + Partials       5800     3120     -2680     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-14.71%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (+19.19%)` | :arrow_up: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-14.84%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-14.84%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-0.17%)` | :arrow_down: |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] make deep store upload retry async with configurable parallelism [pinot]

Posted by "itschrispeck (via GitHub)" <gi...@apache.org>.
itschrispeck commented on PR #12017:
URL: https://github.com/apache/pinot/pull/12017#issuecomment-1835253767

   @Jackie-Jiang Thanks for the reviews! addressed the remaining comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org