You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2024/02/03 01:02:54 UTC

[PR] [Multi-stage] Optimize query dispatch [pinot]

Jackie-Jiang opened a new pull request, #12358:
URL: https://github.com/apache/pinot/pull/12358

   - Do the server independent serialization only once per stage
   - Send one plan per server instead of one plan per stage per server
   - Parallel execute the server dependent serialization


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Multi-stage] Optimize query dispatch [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #12358:
URL: https://github.com/apache/pinot/pull/12358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Multi-stage] Optimize query dispatch [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12358:
URL: https://github.com/apache/pinot/pull/12358#discussion_r1477085074


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -107,50 +110,76 @@ public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan d
   void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map<String, String> queryOptions)
       throws Exception {
     Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
-    BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new LinkedBlockingQueue<>();
     List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
     int numStages = stagePlans.size();
-    int numDispatchCalls = 0;
-    // Do not submit the reduce stage (stage 0)
+    Set<QueryServerInstance> serverInstances = new HashSet<>();
+    // TODO: If serialization is slow, consider serializing each stage in parallel
+    StageInfo[] stageInfoMap = new StageInfo[numStages];
+    // Ignore the reduce stage (stage 0)
     for (int stageId = 1; stageId < numStages; stageId++) {
-      for (Map.Entry<QueryServerInstance, List<Integer>> entry : stagePlans.get(stageId)
-          .getServerInstanceToWorkerIdMap().entrySet()) {
-        QueryServerInstance queryServerInstance = entry.getKey();
-        Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder();
-        queryRequestBuilder.addStagePlan(
-            QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, queryServerInstance, entry.getValue()));
-        Worker.QueryRequest queryRequest =
-            queryRequestBuilder.putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
-                    String.valueOf(requestId))
-                .putMetadata(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs))
-                .putAllMetadata(queryOptions).build();
-        DispatchClient client = getOrCreateDispatchClient(queryServerInstance);
-        int finalStageId = stageId;
-        _executorService.submit(
-            () -> client.submit(queryRequest, finalStageId, queryServerInstance, deadline, dispatchCallbacks::offer));
-        numDispatchCalls++;
-      }
+      DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+      serverInstances.addAll(stagePlan.getServerInstanceToWorkerIdMap().keySet());
+      Plan.StageNode rootNode =
+          StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot());
+      List<Worker.WorkerMetadata> workerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan);
+      stageInfoMap[stageId] = new StageInfo(rootNode, workerMetadataList, stagePlan.getCustomProperties());
+    }
+    Map<String, String> requestMetadata = new HashMap<>();
+    requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId));
+    requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(timeoutMs));
+    requestMetadata.putAll(queryOptions);
+
+    // Submit the query plan to all servers in parallel
+    int numServers = serverInstances.size();
+    BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new ArrayBlockingQueue<>(numServers);
+    for (QueryServerInstance serverInstance : serverInstances) {
+      _executorService.submit(() -> {

Review Comment:
   note that `QueryServer` could also apply similar technique
   
   https://github.com/apache/pinot/blob/0a4398634be81cdbbe891b3da249134ef98743e7/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java#L108-L123
   
   deserialization (line 109) can be move into the runAsync (line 121) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Multi-stage] Optimize query dispatch [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12358:
URL: https://github.com/apache/pinot/pull/12358#issuecomment-1925005610

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: `73 lines` in your changes are missing coverage. Please review.
   > Comparison is base [(`33074e1`)](https://app.codecov.io/gh/apache/pinot/commit/33074e1e7ec807251c8163d1807c61f5f80e8853?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.71% compared to head [(`9d24f79`)](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 0.00%.
   > Report is 4 commits behind head on master.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [.../pinot/query/service/dispatch/QueryDispatcher.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9RdWVyeURpc3BhdGNoZXIuamF2YQ==) | 0.00% | [51 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../query/runtime/plan/serde/QueryPlanSerDeUtils.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3NlcmRlL1F1ZXJ5UGxhblNlckRlVXRpbHMuamF2YQ==) | 0.00% | [13 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...pinot/query/service/dispatch/DispatchObserver.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9EaXNwYXRjaE9ic2VydmVyLmphdmE=) | 0.00% | [4 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...e/pinot/query/service/dispatch/DispatchClient.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9EaXNwYXRjaENsaWVudC5qYXZh) | 0.00% | [3 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...y/service/dispatch/AsyncQueryDispatchResponse.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9Bc3luY1F1ZXJ5RGlzcGF0Y2hSZXNwb25zZS5qYXZh) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12358       +/-   ##
   =============================================
   - Coverage     61.71%    0.00%   -61.72%     
   =============================================
     Files          2424     2350       -74     
     Lines        132512   128926     -3586     
     Branches      20481    19944      -537     
   =============================================
   - Hits          81782        0    -81782     
   - Misses        44732   128926    +84194     
   + Partials       5998        0     -5998     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-0.01%)` | :arrow_down: |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (ø)` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.60%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.57%)` | :arrow_down: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.72%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Multi-stage] Optimize query dispatch [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12358:
URL: https://github.com/apache/pinot/pull/12358#discussion_r1477120044


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -107,50 +110,76 @@ public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan d
   void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map<String, String> queryOptions)
       throws Exception {
     Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
-    BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new LinkedBlockingQueue<>();
     List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
     int numStages = stagePlans.size();
-    int numDispatchCalls = 0;
-    // Do not submit the reduce stage (stage 0)
+    Set<QueryServerInstance> serverInstances = new HashSet<>();
+    // TODO: If serialization is slow, consider serializing each stage in parallel
+    StageInfo[] stageInfoMap = new StageInfo[numStages];
+    // Ignore the reduce stage (stage 0)
     for (int stageId = 1; stageId < numStages; stageId++) {
-      for (Map.Entry<QueryServerInstance, List<Integer>> entry : stagePlans.get(stageId)
-          .getServerInstanceToWorkerIdMap().entrySet()) {
-        QueryServerInstance queryServerInstance = entry.getKey();
-        Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder();
-        queryRequestBuilder.addStagePlan(
-            QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, queryServerInstance, entry.getValue()));
-        Worker.QueryRequest queryRequest =
-            queryRequestBuilder.putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
-                    String.valueOf(requestId))
-                .putMetadata(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs))
-                .putAllMetadata(queryOptions).build();
-        DispatchClient client = getOrCreateDispatchClient(queryServerInstance);
-        int finalStageId = stageId;
-        _executorService.submit(
-            () -> client.submit(queryRequest, finalStageId, queryServerInstance, deadline, dispatchCallbacks::offer));
-        numDispatchCalls++;
-      }
+      DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+      serverInstances.addAll(stagePlan.getServerInstanceToWorkerIdMap().keySet());
+      Plan.StageNode rootNode =
+          StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot());
+      List<Worker.WorkerMetadata> workerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan);
+      stageInfoMap[stageId] = new StageInfo(rootNode, workerMetadataList, stagePlan.getCustomProperties());
+    }
+    Map<String, String> requestMetadata = new HashMap<>();
+    requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId));
+    requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(timeoutMs));
+    requestMetadata.putAll(queryOptions);
+
+    // Submit the query plan to all servers in parallel
+    int numServers = serverInstances.size();
+    BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new ArrayBlockingQueue<>(numServers);
+    for (QueryServerInstance serverInstance : serverInstances) {
+      _executorService.submit(() -> {

Review Comment:
   Good point! Will do it as a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org