You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/26 16:42:20 UTC

[pinot] branch master updated: [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee6e137c0 [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791)
5ee6e137c0 is described below

commit 5ee6e137c02e3e0a149a7c44dd6a9f5deb29cb5d
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Fri May 26 09:42:14 2023 -0700

    [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791)
    
    
    - Refactor StageMetadata from `pinot-query-planner` to `pinot-query-runtime` module
    - Make `QueryRequest` carries a list of `StagePlan`.
    - Make `QueryDispatcher` sends only one request per `QueryServerInstance` per `StageId`
    - Make `QuerySerDeUtils` serialize a single `PlanFragment` with multiple worker metadata once instead of once per worker.
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 pinot-common/src/main/proto/worker.proto           | 13 +++--
 .../query/planner/DispatchablePlanFragment.java    |  5 --
 .../apache/pinot/query/runtime/QueryRunner.java    |  2 +-
 .../query/runtime/plan/DistributedStagePlan.java   |  1 -
 .../runtime/plan/OpChainExecutionContext.java      |  1 -
 .../query/runtime/plan/PlanRequestContext.java     |  1 -
 .../pinot/query/runtime/plan}/StageMetadata.java   | 10 +++-
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    | 66 ++++++++++++++++------
 .../plan/server/ServerPlanRequestContext.java      |  2 +-
 .../apache/pinot/query/service/QueryServer.java    | 50 +++++++++-------
 .../query/service/dispatch/QueryDispatcher.java    | 50 +++++++---------
 .../pinot/query/runtime/QueryRunnerTestBase.java   | 12 +++-
 .../operator/MailboxReceiveOperatorTest.java       |  2 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |  2 +-
 .../pinot/query/runtime/operator/OpChainTest.java  |  2 +-
 .../operator/SortedMailboxReceiveOperatorTest.java |  2 +-
 .../pinot/query/service/QueryServerTest.java       | 14 ++---
 17 files changed, 137 insertions(+), 98 deletions(-)

diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index c1942f685b..dfb1cd53eb 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -56,10 +56,10 @@ message CancelResponse {
   // intentionally left empty
 }
 
-// QueryRequest is the dispatched content for a specific query stage on a specific worker.
+// QueryRequest is the dispatched content for all query stages to a physical worker.
 message QueryRequest {
-  map<string, string> metadata = 1;
-  StagePlan stagePlan = 2;
+  repeated StagePlan stagePlan = 1;
+  map<string, string> metadata = 2;
 }
 
 // QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
@@ -70,14 +70,15 @@ message QueryResponse {
 
 message StagePlan {
   int32 stageId = 1;
-  string virtualAddress = 2;
-  StageNode stageRoot = 3;
-  StageMetadata stageMetadata = 4;
+  StageNode stageRoot = 2;
+  StageMetadata stageMetadata = 3;
 }
 
 message StageMetadata {
   repeated WorkerMetadata workerMetadata = 1;
   map<string, string> customProperty = 2;
+  string serverAddress = 3;
+  repeated int32 workerIds = 4;
 }
 
 message WorkerMetadata {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java
index c06fa383a6..e0bc3f6a45 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.WorkerMetadata;
 
 
@@ -108,10 +107,6 @@ public class DispatchablePlanFragment {
     _workerMetadataList.addAll(workerMetadataList);
   }
 
-  public StageMetadata toStageMetadata() {
-    return new StageMetadata(_workerMetadataList, _customProperties);
-  }
-
   public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap) {
     _serverInstanceToWorkerIdMap.clear();
     _serverInstanceToWorkerIdMap.putAll(serverInstanceToWorkerIdMap);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 92bf32638d..94640ffae7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -42,7 +42,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
@@ -56,6 +55,7 @@ import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
 import org.apache.pinot.query.runtime.plan.PlanRequestContext;
 import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index 7f4e3015f7..f8a5f5118b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.runtime.plan;
 
 import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index e45db904df..e16795891b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.plan;
 
 import java.util.function.Consumer;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.operator.OpChainStats;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index d3d890d9d5..d0233c1578 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.plan;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
similarity index 90%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
index 16d0da897b..2ad9df403d 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.routing;
+package org.apache.pinot.query.runtime.plan;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.routing.WorkerMetadata;
 
 
 /**
@@ -31,7 +32,7 @@ public class StageMetadata {
   private final List<WorkerMetadata> _workerMetadataList;
   private final Map<String, String> _customProperties;
 
-  public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+  StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
     _workerMetadataList = workerMetadataList;
     _customProperties = customProperties;
   }
@@ -71,6 +72,11 @@ public class StageMetadata {
       return this;
     }
 
+    public Builder addCustomProperties(Map<String, String> customPropertyMap) {
+      _customProperties.putAll(customPropertyMap);
+      return this;
+    }
+
     public StageMetadata build() {
       return new StageMetadata(_workerMetadataList, _customProperties);
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 62b2af1ab4..cd4c1d6fd7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -24,44 +24,51 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.planner.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.DispatchableSubPlan;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 
 
 /**
  * This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements.
  */
 public class QueryPlanSerDeUtils {
+  private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
+      "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
 
   private QueryPlanSerDeUtils() {
     // do not instantiate.
   }
 
-  public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
-    DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
-    distributedStagePlan.setServer(protoToAddress(stagePlan.getVirtualAddress()));
-    distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
-    distributedStagePlan.setStageMetadata(fromProtoStageMetadata(stagePlan.getStageMetadata()));
-    return distributedStagePlan;
+  public static List<DistributedStagePlan> deserializeStagePlan(Worker.QueryRequest request) {
+    List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+    for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+      distributedStagePlans.addAll(deserializeStagePlan(stagePlan));
+    }
+    return distributedStagePlans;
   }
 
-  public static Worker.StagePlan serialize(DistributedStagePlan distributedStagePlan) {
+  public static Worker.StagePlan serialize(DispatchableSubPlan dispatchableSubPlan, int stageId,
+      QueryServerInstance queryServerInstance, List<Integer> workerIds) {
     return Worker.StagePlan.newBuilder()
-        .setStageId(distributedStagePlan.getStageId())
-        .setVirtualAddress(addressToProto(distributedStagePlan.getServer()))
-        .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) distributedStagePlan.getStageRoot()))
-        .setStageMetadata(toProtoStageMetadata(distributedStagePlan.getStageMetadata())).build();
+        .setStageId(stageId)
+        .setStageRoot(StageNodeSerDeUtils.serializeStageNode(
+            (AbstractPlanNode) dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment()
+                .getFragmentRoot()))
+        .setStageMetadata(
+            toProtoStageMetadata(dispatchableSubPlan.getQueryStageList().get(stageId), queryServerInstance, workerIds))
+        .build();
   }
 
-  private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
-      "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
-
   public static VirtualServerAddress protoToAddress(String virtualAddressStr) {
     Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(virtualAddressStr);
     if (!matcher.matches()) {
@@ -79,6 +86,25 @@ public class QueryPlanSerDeUtils {
     return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port());
   }
 
+  private static List<DistributedStagePlan> deserializeStagePlan(Worker.StagePlan stagePlan) {
+    List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+    String serverAddress = stagePlan.getStageMetadata().getServerAddress();
+    String[] hostPort = StringUtils.split(serverAddress, ':');
+    String hostname = hostPort[0];
+    int port = Integer.parseInt(hostPort[1]);
+    AbstractPlanNode stageRoot = StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot());
+    StageMetadata stageMetadata = fromProtoStageMetadata(stagePlan.getStageMetadata());
+    for (int workerId : stagePlan.getStageMetadata().getWorkerIdsList()) {
+      DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
+      VirtualServerAddress virtualServerAddress = new VirtualServerAddress(hostname, port, workerId);
+      distributedStagePlan.setServer(virtualServerAddress);
+      distributedStagePlan.setStageRoot(stageRoot);
+      distributedStagePlan.setStageMetadata(stageMetadata);
+      distributedStagePlans.add(distributedStagePlan);
+    }
+    return distributedStagePlans;
+  }
+
   private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
     StageMetadata.Builder builder = new StageMetadata.Builder();
     List<WorkerMetadata> workerMetadataList = new ArrayList<>();
@@ -119,12 +145,16 @@ public class QueryPlanSerDeUtils {
     return mailboxMetadata;
   }
 
-  private static Worker.StageMetadata toProtoStageMetadata(StageMetadata stageMetadata) {
+  private static Worker.StageMetadata toProtoStageMetadata(DispatchablePlanFragment planFragment,
+      QueryServerInstance queryServerInstance, List<Integer> workerIds) {
     Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
-    for (WorkerMetadata workerMetadata : stageMetadata.getWorkerMetadataList()) {
+    for (WorkerMetadata workerMetadata : planFragment.getWorkerMetadataList()) {
       builder.addWorkerMetadata(toProtoWorkerMetadata(workerMetadata));
     }
-    builder.putAllCustomProperty(stageMetadata.getCustomProperties());
+    builder.putAllCustomProperty(planFragment.getCustomProperties());
+    builder.setServerAddress(String.format("%s:%d", queryServerInstance.getHostname(),
+        queryServerInstance.getQueryMailboxPort()));
+    builder.addAllWorkerIds(workerIds);
     return builder.build();
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 1c0f7168ff..87bf3302b5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -22,9 +22,9 @@ import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.spi.config.table.TableType;
 
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index d88b87220e..126273d310 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -22,10 +22,13 @@ import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
+import java.util.List;
 import java.util.Map;
-import org.apache.pinot.common.exception.QueryException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -45,12 +48,19 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   private static final int MAX_INBOUND_MESSAGE_SIZE = 64 * 1024 * 1024;
 
   private final int _port;
-  private Server _server = null;
   private final QueryRunner _queryRunner;
+  // query submission service is only used for plan submission for now.
+  // TODO: with complex query submission logic we should allow asynchronous query submission return instead of
+  //   directly return from submission response observer.
+  private final ExecutorService _querySubmissionExecutorService;
+
+  private Server _server = null;
 
   public QueryServer(int port, QueryRunner queryRunner) {
     _port = port;
     _queryRunner = queryRunner;
+    _querySubmissionExecutorService = Executors.newCachedThreadPool(
+        new NamedThreadFactory("query_submission_executor_on_" + _port + "_port"));
   }
 
   public void start() {
@@ -83,32 +93,30 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   @Override
   public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
     // Deserialize the request
-    DistributedStagePlan distributedStagePlan;
+    List<DistributedStagePlan> distributedStagePlans;
     Map<String, String> requestMetadataMap;
-    long requestId = -1;
+    requestMetadataMap = request.getMetadataMap();
+    long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     try {
-      distributedStagePlan = QueryPlanSerDeUtils.deserialize(request.getStagePlan());
-      requestMetadataMap = request.getMetadataMap();
-      requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+      distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request);
     } catch (Exception e) {
       LOGGER.error("Caught exception while deserializing the request: {}, payload: {}", requestId, request, e);
       responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
       return;
     }
-
-    try {
-      _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
-      responseObserver.onNext(Worker.QueryResponse.newBuilder()
-          .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build());
-      responseObserver.onCompleted();
-    } catch (Throwable t) {
-      LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId,
-          distributedStagePlan.getStageId(), t);
-      responseObserver.onNext(Worker.QueryResponse.newBuilder()
-          .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR, QueryException.getTruncatedStackTrace(t))
-          .build());
-      responseObserver.onCompleted();
-    }
+    // TODO: allow thrown exception to return back to broker in asynchronous manner.
+    distributedStagePlans.forEach(distributedStagePlan -> _querySubmissionExecutorService.submit(() -> {
+          try {
+            _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+          } catch (Throwable t) {
+            LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId,
+                distributedStagePlan.getStageId(), t);
+          }
+        })
+    );
+    responseObserver.onNext(Worker.QueryResponse.newBuilder()
+        .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build());
+    responseObserver.onCompleted();
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index aa700a2e51..d7eb25a944 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -45,6 +45,7 @@ import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.core.util.trace.TracedThreadFactory;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.DispatchableSubPlan;
 import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
@@ -56,8 +57,8 @@ import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.OpChainStats;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -139,22 +140,21 @@ public class QueryDispatcher {
         for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
             : dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet()) {
           QueryServerInstance queryServerInstance = queryServerEntry.getKey();
-          for (int workerId : queryServerEntry.getValue()) {
-            String host = queryServerInstance.getHostname();
-            int servicePort = queryServerInstance.getQueryServicePort();
-            int mailboxPort = queryServerInstance.getQueryMailboxPort();
-            VirtualServerAddress virtualServerAddress = new VirtualServerAddress(host, mailboxPort, workerId);
-            DispatchClient client = getOrCreateDispatchClient(host, servicePort);
-            dispatchCalls++;
-            int finalStageId = stageId;
-            _executorService.submit(() -> client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-                        QueryPlanSerDeUtils.serialize(
-                            constructDistributedStagePlan(dispatchableSubPlan, finalStageId, virtualServerAddress)))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs))
-                    .putAllMetadata(queryOptions).build(), finalStageId, queryServerInstance, deadline,
-                dispatchCallbacks::offer));
-          }
+          Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder();
+          String host = queryServerInstance.getHostname();
+          int servicePort = queryServerInstance.getQueryServicePort();
+          queryRequestBuilder.addStagePlan(
+              QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, queryServerInstance,
+                  queryServerEntry.getValue()));
+          dispatchCalls++;
+          Worker.QueryRequest queryRequest =
+              queryRequestBuilder.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId))
+                  .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs))
+                  .putAllMetadata(queryOptions).build();
+          DispatchClient client = getOrCreateDispatchClient(host, servicePort);
+          int finalStageId = stageId;
+          _executorService.submit(() -> client.submit(queryRequest, finalStageId, queryServerInstance, deadline,
+              dispatchCallbacks::offer));
         }
       }
     }
@@ -190,14 +190,14 @@ public class QueryDispatcher {
   public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatchableSubPlan, int reduceStageId,
       long timeoutMs,
       MailboxService mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean traceEnabled) {
-    MailboxReceiveNode reduceNode =
-        (MailboxReceiveNode) dispatchableSubPlan.getQueryStageList().get(reduceStageId).getPlanFragment()
-            .getFragmentRoot();
+    DispatchablePlanFragment reduceStagePlanFragment = dispatchableSubPlan.getQueryStageList().get(reduceStageId);
+    MailboxReceiveNode reduceNode = (MailboxReceiveNode) reduceStagePlanFragment.getPlanFragment().getFragmentRoot();
     VirtualServerAddress server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0);
     OpChainExecutionContext context =
         new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs,
             System.currentTimeMillis() + timeoutMs,
-            dispatchableSubPlan.getQueryStageList().get(reduceStageId).toStageMetadata(),
+            new StageMetadata.Builder().setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList())
+                .addCustomProperties(reduceStagePlanFragment.getCustomProperties()).build(),
             traceEnabled);
     MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(context, reduceNode.getSenderStageId());
     List<DataBlock> resultDataBlocks =
@@ -207,14 +207,6 @@ public class QueryDispatcher {
         dispatchableSubPlan.getQueryStageList().get(0).getPlanFragment().getFragmentRoot().getDataSchema());
   }
 
-  @VisibleForTesting
-  public static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
-      int stageId, VirtualServerAddress serverAddress) {
-    return new DistributedStagePlan(stageId, serverAddress,
-        dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(),
-        dispatchableSubPlan.getQueryStageList().get(stageId).toStageMetadata());
-  }
-
   private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs,
       @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap,
       DispatchableSubPlan dispatchableSubPlan,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 32164207d0..190bac2573 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -52,6 +52,7 @@ import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -134,13 +135,22 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
     for (Map.Entry<QueryServerInstance, List<Integer>> entry : serverInstanceToWorkerIdMap.entrySet()) {
       QueryServerInstance server = entry.getKey();
       for (int workerId : entry.getValue()) {
-        DistributedStagePlan distributedStagePlan = QueryDispatcher.constructDistributedStagePlan(
+        DistributedStagePlan distributedStagePlan = constructDistributedStagePlan(
             dispatchableSubPlan, stageId, new VirtualServerAddress(server, workerId));
         _servers.get(server).processQuery(distributedStagePlan, requestMetadataMap);
       }
     }
   }
 
+  protected static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
+      int stageId, VirtualServerAddress serverAddress) {
+    return new DistributedStagePlan(stageId, serverAddress,
+        dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(),
+        new StageMetadata.Builder().setWorkerMetadataList(
+                dispatchableSubPlan.getQueryStageList().get(stageId).getWorkerMetadataList())
+            .addCustomProperties(dispatchableSubPlan.getQueryStageList().get(stageId).getCustomProperties()).build());
+  }
+
   protected List<Object[]> queryH2(String sql)
       throws Exception {
     int firstSemi = sql.indexOf(';');
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index bda313fa73..bcec439843 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -31,12 +31,12 @@ import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 774393945e..ade517f1a1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -23,13 +23,13 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 01f3dc8715..4af33067fa 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -42,13 +42,13 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 4c2ebc5d00..7ed0c2469a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -35,12 +35,12 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index d9803acc17..ca24f97043 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.service;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -40,12 +41,10 @@ import org.apache.pinot.query.planner.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.DispatchableSubPlan;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
-import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.EqualityUtils;
@@ -124,7 +123,9 @@ public class QueryServerTest extends QueryTestSet {
 
         DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
 
-        StageMetadata stageMetadata = dispatchablePlanFragment.toStageMetadata();
+        StageMetadata stageMetadata = new StageMetadata.Builder()
+            .setWorkerMetadataList(dispatchablePlanFragment.getWorkerMetadataList())
+            .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
 
         // ensure mock query runner received correctly deserialized payload.
         QueryRunner mockRunner =
@@ -230,9 +231,8 @@ public class QueryServerTest extends QueryTestSet {
     QueryServerInstance serverInstance = serverInstanceToWorkerIdMap.keySet().iterator().next();
     int workerId = serverInstanceToWorkerIdMap.get(serverInstance).get(0);
 
-    return Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(
-            QueryDispatcher.constructDistributedStagePlan(dispatchableSubPlan, stageId,
-                new VirtualServerAddress(serverInstance, workerId))))
+    return Worker.QueryRequest.newBuilder().addStagePlan(
+            QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, serverInstance, ImmutableList.of(workerId)))
         // the default configurations that must exist.
         .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()))
         .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org