You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2024/02/03 01:02:54 UTC
[PR] [Multi-stage] Optimize query dispatch [pinot]
Jackie-Jiang opened a new pull request, #12358:
URL: https://github.com/apache/pinot/pull/12358
- Do the server independent serialization only once per stage
- Send one plan per server instead of one plan per stage per server
- Parallel execute the server dependent serialization
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [Multi-stage] Optimize query dispatch [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #12358:
URL: https://github.com/apache/pinot/pull/12358
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [Multi-stage] Optimize query dispatch [pinot]
Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12358:
URL: https://github.com/apache/pinot/pull/12358#discussion_r1477085074
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -107,50 +110,76 @@ public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan d
void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map<String, String> queryOptions)
throws Exception {
Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
- BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new LinkedBlockingQueue<>();
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
int numStages = stagePlans.size();
- int numDispatchCalls = 0;
- // Do not submit the reduce stage (stage 0)
+ Set<QueryServerInstance> serverInstances = new HashSet<>();
+ // TODO: If serialization is slow, consider serializing each stage in parallel
+ StageInfo[] stageInfoMap = new StageInfo[numStages];
+ // Ignore the reduce stage (stage 0)
for (int stageId = 1; stageId < numStages; stageId++) {
- for (Map.Entry<QueryServerInstance, List<Integer>> entry : stagePlans.get(stageId)
- .getServerInstanceToWorkerIdMap().entrySet()) {
- QueryServerInstance queryServerInstance = entry.getKey();
- Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder();
- queryRequestBuilder.addStagePlan(
- QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, queryServerInstance, entry.getValue()));
- Worker.QueryRequest queryRequest =
- queryRequestBuilder.putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
- String.valueOf(requestId))
- .putMetadata(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs))
- .putAllMetadata(queryOptions).build();
- DispatchClient client = getOrCreateDispatchClient(queryServerInstance);
- int finalStageId = stageId;
- _executorService.submit(
- () -> client.submit(queryRequest, finalStageId, queryServerInstance, deadline, dispatchCallbacks::offer));
- numDispatchCalls++;
- }
+ DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+ serverInstances.addAll(stagePlan.getServerInstanceToWorkerIdMap().keySet());
+ Plan.StageNode rootNode =
+ StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot());
+ List<Worker.WorkerMetadata> workerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan);
+ stageInfoMap[stageId] = new StageInfo(rootNode, workerMetadataList, stagePlan.getCustomProperties());
+ }
+ Map<String, String> requestMetadata = new HashMap<>();
+ requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId));
+ requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(timeoutMs));
+ requestMetadata.putAll(queryOptions);
+
+ // Submit the query plan to all servers in parallel
+ int numServers = serverInstances.size();
+ BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new ArrayBlockingQueue<>(numServers);
+ for (QueryServerInstance serverInstance : serverInstances) {
+ _executorService.submit(() -> {
Review Comment:
note that `QueryServer` could also apply similar technique
https://github.com/apache/pinot/blob/0a4398634be81cdbbe891b3da249134ef98743e7/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java#L108-L123
deserialization (line 109) can be move into the runAsync (line 121)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [Multi-stage] Optimize query dispatch [pinot]
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12358:
URL: https://github.com/apache/pinot/pull/12358#issuecomment-1925005610
## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
Attention: `73 lines` in your changes are missing coverage. Please review.
> Comparison is base [(`33074e1`)](https://app.codecov.io/gh/apache/pinot/commit/33074e1e7ec807251c8163d1807c61f5f80e8853?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.71% compared to head [(`9d24f79`)](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 0.00%.
> Report is 4 commits behind head on master.
| [Files](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
|---|---|---|
| [.../pinot/query/service/dispatch/QueryDispatcher.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9RdWVyeURpc3BhdGNoZXIuamF2YQ==) | 0.00% | [51 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
| [.../query/runtime/plan/serde/QueryPlanSerDeUtils.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3NlcmRlL1F1ZXJ5UGxhblNlckRlVXRpbHMuamF2YQ==) | 0.00% | [13 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
| [...pinot/query/service/dispatch/DispatchObserver.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9EaXNwYXRjaE9ic2VydmVyLmphdmE=) | 0.00% | [4 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
| [...e/pinot/query/service/dispatch/DispatchClient.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9EaXNwYXRjaENsaWVudC5qYXZh) | 0.00% | [3 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
| [...y/service/dispatch/AsyncQueryDispatchResponse.java](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9kaXNwYXRjaC9Bc3luY1F1ZXJ5RGlzcGF0Y2hSZXNwb25zZS5qYXZh) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
<details><summary>Additional details and impacted files</summary>
```diff
@@ Coverage Diff @@
## master #12358 +/- ##
=============================================
- Coverage 61.71% 0.00% -61.72%
=============================================
Files 2424 2350 -74
Lines 132512 128926 -3586
Branches 20481 19944 -537
=============================================
- Hits 81782 0 -81782
- Misses 44732 128926 +84194
+ Partials 5998 0 -5998
```
| [Flag](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-0.01%)` | :arrow_down: |
| [integration1](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [integration2](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (ø)` | |
| [java-11](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [java-21](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.60%)` | :arrow_down: |
| [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.57%)` | :arrow_down: |
| [temurin](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.72%)` | :arrow_down: |
| [unittests](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
| [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12358/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
</details>
[:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12358?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
:loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
Re: [PR] [Multi-stage] Optimize query dispatch [pinot]
Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12358:
URL: https://github.com/apache/pinot/pull/12358#discussion_r1477120044
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -107,50 +110,76 @@ public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan d
void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map<String, String> queryOptions)
throws Exception {
Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
- BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new LinkedBlockingQueue<>();
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
int numStages = stagePlans.size();
- int numDispatchCalls = 0;
- // Do not submit the reduce stage (stage 0)
+ Set<QueryServerInstance> serverInstances = new HashSet<>();
+ // TODO: If serialization is slow, consider serializing each stage in parallel
+ StageInfo[] stageInfoMap = new StageInfo[numStages];
+ // Ignore the reduce stage (stage 0)
for (int stageId = 1; stageId < numStages; stageId++) {
- for (Map.Entry<QueryServerInstance, List<Integer>> entry : stagePlans.get(stageId)
- .getServerInstanceToWorkerIdMap().entrySet()) {
- QueryServerInstance queryServerInstance = entry.getKey();
- Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder();
- queryRequestBuilder.addStagePlan(
- QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, queryServerInstance, entry.getValue()));
- Worker.QueryRequest queryRequest =
- queryRequestBuilder.putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
- String.valueOf(requestId))
- .putMetadata(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs))
- .putAllMetadata(queryOptions).build();
- DispatchClient client = getOrCreateDispatchClient(queryServerInstance);
- int finalStageId = stageId;
- _executorService.submit(
- () -> client.submit(queryRequest, finalStageId, queryServerInstance, deadline, dispatchCallbacks::offer));
- numDispatchCalls++;
- }
+ DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+ serverInstances.addAll(stagePlan.getServerInstanceToWorkerIdMap().keySet());
+ Plan.StageNode rootNode =
+ StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot());
+ List<Worker.WorkerMetadata> workerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan);
+ stageInfoMap[stageId] = new StageInfo(rootNode, workerMetadataList, stagePlan.getCustomProperties());
+ }
+ Map<String, String> requestMetadata = new HashMap<>();
+ requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId));
+ requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(timeoutMs));
+ requestMetadata.putAll(queryOptions);
+
+ // Submit the query plan to all servers in parallel
+ int numServers = serverInstances.size();
+ BlockingQueue<AsyncQueryDispatchResponse> dispatchCallbacks = new ArrayBlockingQueue<>(numServers);
+ for (QueryServerInstance serverInstance : serverInstances) {
+ _executorService.submit(() -> {
Review Comment:
Good point! Will do it as a separate PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org