You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2024/02/03 23:42:52 UTC
[PR] [Multi-stage] Ser/de stage plan in parallel [pinot]
Jackie-Jiang opened a new pull request, #12363:
URL: https://github.com/apache/pinot/pull/12363
Making stage plan serialization (on broker) and deserialization (on server) execute in parallel
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [Multi-stage] Ser/de stage plan in parallel [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #12363:
URL: https://github.com/apache/pinot/pull/12363
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [Multi-stage] Ser/de stage plan in parallel [pinot]
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12363:
URL: https://github.com/apache/pinot/pull/12363#issuecomment-1925498994
## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
Attention: `63 lines` in your changes are missing coverage. Please review.
> Comparison is base [(`c9a82c4`)](https://app.codecov.io/gh/apache/pinot/commit/c9a82c40a2c8bed5e86d8278e0bb57bfc5bee86f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.68% compared to head [(`f623eb4`)](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 27.69%.
| [Files](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
|---|---|---|
| [...apache/pinot/query/service/server/QueryServer.java](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9zZXJ2ZXIvUXVlcnlTZXJ2ZXIuamF2YQ==) | 0.00% | [33 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
| [.../pinot/query/service/dispatch/QueryDispatcher.java](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9RdWVyeURpc3BhdGNoZXIuamF2YQ==) | 0.00% | [22 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
| [.../query/runtime/plan/serde/QueryPlanSerDeUtils.java](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3NlcmRlL1F1ZXJ5UGxhblNlckRlVXRpbHMuamF2YQ==) | 0.00% | [8 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
<details><summary>Additional details and impacted files</summary>
```diff
@@ Coverage Diff @@
## master #12363 +/- ##
=============================================
- Coverage 61.68% 27.69% -34.00%
+ Complexity 207 201 -6
=============================================
Files 2426 2426
Lines 132674 132673 -1
Branches 20502 20506 +4
=============================================
- Hits 81838 36738 -45100
- Misses 44832 93151 +48319
+ Partials 6004 2784 -3220
```
| [Flag](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration1](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration2](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [java-11](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [java-21](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-33.88%)` | :arrow_down: |
| [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-33.98%)` | :arrow_down: |
| [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [temurin](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-34.00%)` | :arrow_down: |
| [unittests](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-33.99%)` | :arrow_down: |
| [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-0.01%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
</details>
[:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
:loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [Multi-stage] Ser/de stage plan in parallel [pinot]
Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12363:
URL: https://github.com/apache/pinot/pull/12363#discussion_r1477143458
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -97,42 +97,60 @@ public void shutdown() {
@Override
public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
- // Deserialize the request
- List<DistributedStagePlan> distributedStagePlans;
- Map<String, String> requestMetadata;
- requestMetadata = Collections.unmodifiableMap(request.getMetadataMap());
+ Map<String, String> requestMetadata = request.getMetadataMap();
long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
- // 1. Deserialized request
- try {
- distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request);
- } catch (Exception e) {
- LOGGER.error("Caught exception while deserializing the request: {}", requestId, e);
- responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
- return;
- }
- // 2. Submit distributed stage plans, await response successful or any failure which cancels all other tasks.
- int numSubmission = distributedStagePlans.size();
- CompletableFuture<?>[] submissionStubs = new CompletableFuture[numSubmission];
- for (int i = 0; i < numSubmission; i++) {
- DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i);
- submissionStubs[i] =
- CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadata),
- _querySubmissionExecutorService);
+
+ List<Worker.StagePlan> stagePlans = request.getStagePlanList();
+ int numStages = stagePlans.size();
+ CompletableFuture<?>[] stageSubmissionStubs = new CompletableFuture[numStages];
+ List<CompletableFuture<Void>> queryExecutionStubs = Collections.synchronizedList(new ArrayList<>());
+ for (int i = 0; i < numStages; i++) {
+ Worker.StagePlan stagePlan = stagePlans.get(i);
+ stageSubmissionStubs[i] = CompletableFuture.runAsync(() -> {
+ List<DistributedStagePlan> workerPlans;
+ try {
+ workerPlans = QueryPlanSerDeUtils.deserializeStagePlan(stagePlan);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while deserializing stage plan for request: %d, stage id: %d", requestId,
+ stagePlan.getStageId()), e);
+ }
+ int numWorkers = workerPlans.size();
+ CompletableFuture<?>[] workerSubmissionStubs = new CompletableFuture[numWorkers];
+ for (DistributedStagePlan workerPlan : workerPlans) {
+ queryExecutionStubs.add(
+ CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerPlan, requestMetadata),
+ _querySubmissionExecutorService));
+ }
+ try {
+ CompletableFuture.allOf(workerSubmissionStubs)
+ .get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Caught exception while submitting request: %d, stage id: %d", requestId,
+ stagePlan.getStageId()), e);
+ } finally {
+ for (CompletableFuture<?> future : workerSubmissionStubs) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ }
+ }, _querySubmissionExecutorService);
}
try {
- CompletableFuture.allOf(submissionStubs).get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ CompletableFuture.allOf(stageSubmissionStubs).get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Review Comment:
looks like there're some issues with submission stub NPE. PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org