You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2024/02/03 23:42:52 UTC

[PR] [Multi-stage] Ser/de stage plan in parallel [pinot]

Jackie-Jiang opened a new pull request, #12363:
URL: https://github.com/apache/pinot/pull/12363

   Making stage plan serialization (on broker) and deserialization (on server) execute in parallel


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Multi-stage] Ser/de stage plan in parallel [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #12363:
URL: https://github.com/apache/pinot/pull/12363


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Multi-stage] Ser/de stage plan in parallel [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12363:
URL: https://github.com/apache/pinot/pull/12363#issuecomment-1925498994

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: `63 lines` in your changes are missing coverage. Please review.
   > Comparison is base [(`c9a82c4`)](https://app.codecov.io/gh/apache/pinot/commit/c9a82c40a2c8bed5e86d8278e0bb57bfc5bee86f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.68% compared to head [(`f623eb4`)](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 27.69%.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [...apache/pinot/query/service/server/QueryServer.java](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9zZXJ2ZXIvUXVlcnlTZXJ2ZXIuamF2YQ==) | 0.00% | [33 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../pinot/query/service/dispatch/QueryDispatcher.java](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9RdWVyeURpc3BhdGNoZXIuamF2YQ==) | 0.00% | [22 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../query/runtime/plan/serde/QueryPlanSerDeUtils.java](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3NlcmRlL1F1ZXJ5UGxhblNlckRlVXRpbHMuamF2YQ==) | 0.00% | [8 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12363       +/-   ##
   =============================================
   - Coverage     61.68%   27.69%   -34.00%     
   + Complexity      207      201        -6     
   =============================================
     Files          2426     2426               
     Lines        132674   132673        -1     
     Branches      20502    20506        +4     
   =============================================
   - Hits          81838    36738    -45100     
   - Misses        44832    93151    +48319     
   + Partials       6004     2784     -3220     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-33.88%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-33.98%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-34.00%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-33.99%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12363/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.69% <0.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12363?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Multi-stage] Ser/de stage plan in parallel [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12363:
URL: https://github.com/apache/pinot/pull/12363#discussion_r1477143458


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -97,42 +97,60 @@ public void shutdown() {
 
   @Override
   public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
-    // Deserialize the request
-    List<DistributedStagePlan> distributedStagePlans;
-    Map<String, String> requestMetadata;
-    requestMetadata = Collections.unmodifiableMap(request.getMetadataMap());
+    Map<String, String> requestMetadata = request.getMetadataMap();
     long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
-    // 1. Deserialized request
-    try {
-      distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request);
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while deserializing the request: {}", requestId, e);
-      responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
-      return;
-    }
-    // 2. Submit distributed stage plans, await response successful or any failure which cancels all other tasks.
-    int numSubmission = distributedStagePlans.size();
-    CompletableFuture<?>[] submissionStubs = new CompletableFuture[numSubmission];
-    for (int i = 0; i < numSubmission; i++) {
-      DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i);
-      submissionStubs[i] =
-          CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadata),
-              _querySubmissionExecutorService);
+
+    List<Worker.StagePlan> stagePlans = request.getStagePlanList();
+    int numStages = stagePlans.size();
+    CompletableFuture<?>[] stageSubmissionStubs = new CompletableFuture[numStages];
+    List<CompletableFuture<Void>> queryExecutionStubs = Collections.synchronizedList(new ArrayList<>());
+    for (int i = 0; i < numStages; i++) {
+      Worker.StagePlan stagePlan = stagePlans.get(i);
+      stageSubmissionStubs[i] = CompletableFuture.runAsync(() -> {
+        List<DistributedStagePlan> workerPlans;
+        try {
+          workerPlans = QueryPlanSerDeUtils.deserializeStagePlan(stagePlan);
+        } catch (Exception e) {
+          throw new RuntimeException(
+              String.format("Caught exception while deserializing stage plan for request: %d, stage id: %d", requestId,
+                  stagePlan.getStageId()), e);
+        }
+        int numWorkers = workerPlans.size();
+        CompletableFuture<?>[] workerSubmissionStubs = new CompletableFuture[numWorkers];
+        for (DistributedStagePlan workerPlan : workerPlans) {
+          queryExecutionStubs.add(
+              CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerPlan, requestMetadata),
+                  _querySubmissionExecutorService));
+        }
+        try {
+          CompletableFuture.allOf(workerSubmissionStubs)
+              .get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+          throw new RuntimeException(
+              String.format("Caught exception while submitting request: %d, stage id: %d", requestId,
+                  stagePlan.getStageId()), e);
+        } finally {
+          for (CompletableFuture<?> future : workerSubmissionStubs) {
+            if (!future.isDone()) {
+              future.cancel(true);
+            }
+          }
+        }
+      }, _querySubmissionExecutorService);
     }
     try {
-      CompletableFuture.allOf(submissionStubs).get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+      CompletableFuture.allOf(stageSubmissionStubs).get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

Review Comment:
   looks like there're some issues with submission stub NPE. PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org