You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "itschrispeck (via GitHub)" <gi...@apache.org> on 2023/11/17 01:03:29 UTC
[PR] make deep store upload retry async with configurable parallelism [pinot]
itschrispeck opened a new pull request, #12017:
URL: https://github.com/apache/pinot/pull/12017
Relates to: https://github.com/apache/pinot/issues/11874
This PR moves the deep store upload retry to be an asynchronous operation. By default, the thread pool that performs the uploads is limited to a single thread which is evicted when unused.
A new config is introduced to allow for increasing the maximum number of threads that can be used for retrying segment uploads: `controller.realtime.segment.deepStoreUploadRetry.parallelism`
A new metric is emitted to track the upload retry queue size, which can be used to ensure we can have visibility into scenarios where segment upload retry cannot keep up with new segments that are missing their deepstore copy: `LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE`
Tested via cluster deployment, edited ZK to remove segment urls to trigger uploads and verified segments were uploaded.
tags: `enhancement`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] make deep store upload retry async with configurable parallelism [pinot]
Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat merged PR #12017:
URL: https://github.com/apache/pinot/pull/12017
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] make deep store upload retry async with configurable parallelism [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1408522908
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
- @Test
+ @Test(timeOut = 30_000L)
Review Comment:
```suggestion
@Test
```
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
- @Test
+ @Test(timeOut = 30_000L)
public void testUploadToSegmentStore()
- throws HttpErrorStatusException, IOException, URISyntaxException {
+ throws HttpErrorStatusException, IOException, URISyntaxException, InterruptedException {
Review Comment:
```suggestion
throws HttpErrorStatusException, IOException, URISyntaxException {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
continue;
}
// Skip the fix for the segment if it is already out of retention.
- if (retentionStrategy != null && retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
- LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", segmentName);
+ if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+ LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention",
+ segmentName);
continue;
}
- LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
-
- // Find servers which have online replica
- List<URI> peerSegmentURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
- if (peerSegmentURIs.isEmpty()) {
- throw new IllegalStateException(
- String.format("Failed to upload segment %s to deep store because no online replica is found",
- segmentName));
- }
-
- // Randomly ask one server to upload
- URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
- String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
- serverUploadRequestUrl =
- String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
- LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
- serverUploadRequestUrl);
- String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
- String segmentDownloadUrl =
- moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
- LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
- // Update segment ZK metadata by adding the download URL
- segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
- // TODO: add version check when persist segment ZK metadata
- persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
- LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
- segmentDownloadUrl);
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
} catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
- LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);
+ LOGGER.warn("Failed checking segment deep store URL for segment {}", segmentName);
+ }
+
+ // Skip the fix if an upload is already queued for this segment
+ if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+ continue;
}
+ _deepStoreUploadExecutorPendingSegments.add(segmentName);
Review Comment:
Should we also update the queue size here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
continue;
}
// Skip the fix for the segment if it is already out of retention.
- if (retentionStrategy != null && retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
- LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", segmentName);
+ if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+ LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention",
+ segmentName);
continue;
}
- LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
-
- // Find servers which have online replica
- List<URI> peerSegmentURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
- if (peerSegmentURIs.isEmpty()) {
- throw new IllegalStateException(
- String.format("Failed to upload segment %s to deep store because no online replica is found",
- segmentName));
- }
-
- // Randomly ask one server to upload
- URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
- String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
- serverUploadRequestUrl =
- String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
- LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
- serverUploadRequestUrl);
- String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
- String segmentDownloadUrl =
- moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
- LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
- // Update segment ZK metadata by adding the download URL
- segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
- // TODO: add version check when persist segment ZK metadata
- persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
- LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
- segmentDownloadUrl);
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
} catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
- LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);
+ LOGGER.warn("Failed checking segment deep store URL for segment {}", segmentName);
+ }
+
+ // Skip the fix if an upload is already queued for this segment
+ if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+ continue;
}
+ _deepStoreUploadExecutorPendingSegments.add(segmentName);
Review Comment:
(minor)
```suggestion
// Skip the fix if an upload is already queued for this segment
if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
continue;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] make deep store upload retry async with configurable parallelism [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1398260187
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1035,6 +1036,13 @@ public void testUploadToSegmentStore()
// Verify the result
segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);
+
+ // Block until all tasks have been able to complete
+ ThreadPoolExecutor deepStoreUploadExecutor = segmentManager.getDeepStoreUploadExecutor();
+ while (deepStoreUploadExecutor.getActiveCount() > 0) {
Review Comment:
Seems active count might not always be accurate (e.g. task not scheduled yet). Shall we wait on downloadUrl being set?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -194,6 +200,9 @@ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
_isTmpSegmentAsyncDeletionEnabled = controllerConf.isTmpSegmentAsyncDeletionEnabled();
_deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs();
_fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? initFileUploadDownloadClient() : null;
+ _deepStoreUploadExecutor = _isDeepStoreLLCSegmentUploadRetryEnabled ? initDeepStoreUploadExecutorService(
+ controllerConf.getDeepStoreRetryUploadParallelism()) : null;
+ _deepStoreUploadExecutorPendingSegments = _isDeepStoreLLCSegmentUploadRetryEnabled ? new HashSet<>() : null;
Review Comment:
We need a thread safe set
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +915,9 @@ public void testCommitSegmentMetadata() {
/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
- @Test
+ @Test(timeOut = 30_000L)
Review Comment:
Do not use timeout. We usually use `TestUtils.waitForCondition()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] make deep store upload retry async with configurable parallelism [pinot]
Posted by "itschrispeck (via GitHub)" <gi...@apache.org>.
itschrispeck commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1402706477
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -1035,6 +1036,13 @@ public void testUploadToSegmentStore()
// Verify the result
segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);
+
+ // Block until all tasks have been able to complete
+ ThreadPoolExecutor deepStoreUploadExecutor = segmentManager.getDeepStoreUploadExecutor();
+ while (deepStoreUploadExecutor.getActiveCount() > 0) {
Review Comment:
Changed to use pending segment set size, as waiting on downloadUrl makes it difficult to correctly test the error cases where we assert downloadUrl isn't set
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] make deep store upload retry async with configurable parallelism [pinot]
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12017:
URL: https://github.com/apache/pinot/pull/12017#issuecomment-1815610673
## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
Attention: `2 lines` in your changes are missing coverage. Please review.
> Comparison is base [(`2beb9a4`)](https://app.codecov.io/gh/apache/pinot/commit/2beb9a4938d7d7c9481fd2546075e2a2475fe0ec?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.61% compared to head [(`3bf17df`)](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 46.77%.
> Report is 19 commits behind head on master.
| [Files](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
|---|---|---|
| [...g/apache/pinot/common/metrics/ControllerGauge.java](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyR2F1Z2UuamF2YQ==) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
<details><summary>Additional details and impacted files</summary>
```diff
@@ Coverage Diff @@
## master #12017 +/- ##
=============================================
- Coverage 61.61% 46.77% -14.84%
- Complexity 207 944 +737
=============================================
Files 2385 1787 -598
Lines 129214 93696 -35518
Branches 20003 15158 -4845
=============================================
- Hits 79613 43826 -35787
- Misses 43801 46750 +2949
+ Partials 5800 3120 -2680
```
| [Flag](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration1](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration2](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [java-11](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [java-21](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-14.71%)` | :arrow_down: |
| [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (+19.19%)` | :arrow_up: |
| [temurin](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-14.84%)` | :arrow_down: |
| [unittests](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-14.84%)` | :arrow_down: |
| [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.77% <0.00%> (-0.17%)` | :arrow_down: |
| [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12017/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
</details>
[:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12017?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
:loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] make deep store upload retry async with configurable parallelism [pinot]
Posted by "itschrispeck (via GitHub)" <gi...@apache.org>.
itschrispeck commented on PR #12017:
URL: https://github.com/apache/pinot/pull/12017#issuecomment-1835253767
@Jackie-Jiang Thanks for the reviews! addressed the remaining comments
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org