You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "ankitsultana (via GitHub)" <gi...@apache.org> on 2023/05/24 19:40:21 UTC

[GitHub] [pinot] ankitsultana commented on a diff in pull request #10791: [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module

ankitsultana commented on code in PR #10791:
URL: https://github.com/apache/pinot/pull/10791#discussion_r1204677238


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -51,12 +53,23 @@ public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
     return distributedStagePlan;
   }
 
-  public static Worker.StagePlan serialize(DistributedStagePlan distributedStagePlan) {
+  public static List<DistributedStagePlan> deserialize(Worker.QueryRequest request) {
+    List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+    for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+      distributedStagePlans.add(deserialize(stagePlan));
+    }
+    return distributedStagePlans;
+  }
+
+  public static Worker.StagePlan serialize(DispatchableSubPlan dispatchableSubPlan, int stageId,
+      VirtualServerAddress serverAddress) {

Review Comment:
   Left a comment in the other part of the code where the serialize is being called (num-workers * num-stages) times per query.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -139,22 +140,24 @@ int submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeout
         for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
             : dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet()) {
           QueryServerInstance queryServerInstance = queryServerEntry.getKey();
+          Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder();
+          String host = queryServerInstance.getHostname();
+          int servicePort = queryServerInstance.getQueryServicePort();
+          int mailboxPort = queryServerInstance.getQueryMailboxPort();
           for (int workerId : queryServerEntry.getValue()) {
-            String host = queryServerInstance.getHostname();
-            int servicePort = queryServerInstance.getQueryServicePort();
-            int mailboxPort = queryServerInstance.getQueryMailboxPort();
             VirtualServerAddress virtualServerAddress = new VirtualServerAddress(host, mailboxPort, workerId);
-            DispatchClient client = getOrCreateDispatchClient(host, servicePort);
             dispatchCalls++;
-            int finalStageId = stageId;
-            _executorService.submit(() -> client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-                        QueryPlanSerDeUtils.serialize(
-                            constructDistributedStagePlan(dispatchableSubPlan, finalStageId, virtualServerAddress)))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs))
-                    .putAllMetadata(queryOptions).build(), finalStageId, queryServerInstance, deadline,
-                dispatchCallbacks::offer));
+            queryRequestBuilder.addStagePlan(
+                QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, virtualServerAddress));

Review Comment:
   This would serialize the entire sub plan for each worker and stageId combination. Big use-cases would usually have 256 partitions (i.e. workers) and ~10 or so stages.
   
   So this means we'll call this ~2000 times. For low qps use-cases this should be fine but relatively higher use-cases might start getting bottlenecked.
   
   @walterddr : I remember we had discussed this a couple of months ago. Was wondering if we are planning to fix this anytime soon. cc: @xiangfu0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org