You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/26 16:42:20 UTC
[pinot] branch master updated: [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee6e137c0 [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791)
5ee6e137c0 is described below
commit 5ee6e137c02e3e0a149a7c44dd6a9f5deb29cb5d
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Fri May 26 09:42:14 2023 -0700
[multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791)
- Refactor StageMetadata from `pinot-query-planner` to `pinot-query-runtime` module
- Make `QueryRequest` carries a list of `StagePlan`.
- Make `QueryDispatcher` sends only one request per `QueryServerInstance` per `StageId`
- Make `QuerySerDeUtils` serialize a single `PlanFragment` with multiple worker metadata once instead of once per worker.
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
pinot-common/src/main/proto/worker.proto | 13 +++--
.../query/planner/DispatchablePlanFragment.java | 5 --
.../apache/pinot/query/runtime/QueryRunner.java | 2 +-
.../query/runtime/plan/DistributedStagePlan.java | 1 -
.../runtime/plan/OpChainExecutionContext.java | 1 -
.../query/runtime/plan/PlanRequestContext.java | 1 -
.../pinot/query/runtime/plan}/StageMetadata.java | 10 +++-
.../runtime/plan/serde/QueryPlanSerDeUtils.java | 66 ++++++++++++++++------
.../plan/server/ServerPlanRequestContext.java | 2 +-
.../apache/pinot/query/service/QueryServer.java | 50 +++++++++-------
.../query/service/dispatch/QueryDispatcher.java | 50 +++++++---------
.../pinot/query/runtime/QueryRunnerTestBase.java | 12 +++-
.../operator/MailboxReceiveOperatorTest.java | 2 +-
.../runtime/operator/MailboxSendOperatorTest.java | 2 +-
.../pinot/query/runtime/operator/OpChainTest.java | 2 +-
.../operator/SortedMailboxReceiveOperatorTest.java | 2 +-
.../pinot/query/service/QueryServerTest.java | 14 ++---
17 files changed, 137 insertions(+), 98 deletions(-)
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index c1942f685b..dfb1cd53eb 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -56,10 +56,10 @@ message CancelResponse {
// intentionally left empty
}
-// QueryRequest is the dispatched content for a specific query stage on a specific worker.
+// QueryRequest is the dispatched content for all query stages to a physical worker.
message QueryRequest {
- map<string, string> metadata = 1;
- StagePlan stagePlan = 2;
+ repeated StagePlan stagePlan = 1;
+ map<string, string> metadata = 2;
}
// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
@@ -70,14 +70,15 @@ message QueryResponse {
message StagePlan {
int32 stageId = 1;
- string virtualAddress = 2;
- StageNode stageRoot = 3;
- StageMetadata stageMetadata = 4;
+ StageNode stageRoot = 2;
+ StageMetadata stageMetadata = 3;
}
message StageMetadata {
repeated WorkerMetadata workerMetadata = 1;
map<string, string> customProperty = 2;
+ string serverAddress = 3;
+ repeated int32 workerIds = 4;
}
message WorkerMetadata {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java
index c06fa383a6..e0bc3f6a45 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -108,10 +107,6 @@ public class DispatchablePlanFragment {
_workerMetadataList.addAll(workerMetadataList);
}
- public StageMetadata toStageMetadata() {
- return new StageMetadata(_workerMetadataList, _customProperties);
- }
-
public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap) {
_serverInstanceToWorkerIdMap.clear();
_serverInstanceToWorkerIdMap.putAll(serverInstanceToWorkerIdMap);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 92bf32638d..94640ffae7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -42,7 +42,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
@@ -56,6 +55,7 @@ import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.apache.pinot.query.runtime.plan.PlanRequestContext;
import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index 7f4e3015f7..f8a5f5118b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query.runtime.plan;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index e45db904df..e16795891b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.plan;
import java.util.function.Consumer;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.operator.OpChainStats;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index d3d890d9d5..d0233c1578 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.plan;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
similarity index 90%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
index 16d0da897b..2ad9df403d 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.routing;
+package org.apache.pinot.query.runtime.plan;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.routing.WorkerMetadata;
/**
@@ -31,7 +32,7 @@ public class StageMetadata {
private final List<WorkerMetadata> _workerMetadataList;
private final Map<String, String> _customProperties;
- public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+ StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
_workerMetadataList = workerMetadataList;
_customProperties = customProperties;
}
@@ -71,6 +72,11 @@ public class StageMetadata {
return this;
}
+ public Builder addCustomProperties(Map<String, String> customPropertyMap) {
+ _customProperties.putAll(customPropertyMap);
+ return this;
+ }
+
public StageMetadata build() {
return new StageMetadata(_workerMetadataList, _customProperties);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 62b2af1ab4..cd4c1d6fd7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -24,44 +24,51 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.planner.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
/**
* This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements.
*/
public class QueryPlanSerDeUtils {
+ private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
+ "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
private QueryPlanSerDeUtils() {
// do not instantiate.
}
- public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
- DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
- distributedStagePlan.setServer(protoToAddress(stagePlan.getVirtualAddress()));
- distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
- distributedStagePlan.setStageMetadata(fromProtoStageMetadata(stagePlan.getStageMetadata()));
- return distributedStagePlan;
+ public static List<DistributedStagePlan> deserializeStagePlan(Worker.QueryRequest request) {
+ List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+ for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+ distributedStagePlans.addAll(deserializeStagePlan(stagePlan));
+ }
+ return distributedStagePlans;
}
- public static Worker.StagePlan serialize(DistributedStagePlan distributedStagePlan) {
+ public static Worker.StagePlan serialize(DispatchableSubPlan dispatchableSubPlan, int stageId,
+ QueryServerInstance queryServerInstance, List<Integer> workerIds) {
return Worker.StagePlan.newBuilder()
- .setStageId(distributedStagePlan.getStageId())
- .setVirtualAddress(addressToProto(distributedStagePlan.getServer()))
- .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) distributedStagePlan.getStageRoot()))
- .setStageMetadata(toProtoStageMetadata(distributedStagePlan.getStageMetadata())).build();
+ .setStageId(stageId)
+ .setStageRoot(StageNodeSerDeUtils.serializeStageNode(
+ (AbstractPlanNode) dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment()
+ .getFragmentRoot()))
+ .setStageMetadata(
+ toProtoStageMetadata(dispatchableSubPlan.getQueryStageList().get(stageId), queryServerInstance, workerIds))
+ .build();
}
- private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
- "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
-
public static VirtualServerAddress protoToAddress(String virtualAddressStr) {
Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(virtualAddressStr);
if (!matcher.matches()) {
@@ -79,6 +86,25 @@ public class QueryPlanSerDeUtils {
return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port());
}
+ private static List<DistributedStagePlan> deserializeStagePlan(Worker.StagePlan stagePlan) {
+ List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+ String serverAddress = stagePlan.getStageMetadata().getServerAddress();
+ String[] hostPort = StringUtils.split(serverAddress, ':');
+ String hostname = hostPort[0];
+ int port = Integer.parseInt(hostPort[1]);
+ AbstractPlanNode stageRoot = StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot());
+ StageMetadata stageMetadata = fromProtoStageMetadata(stagePlan.getStageMetadata());
+ for (int workerId : stagePlan.getStageMetadata().getWorkerIdsList()) {
+ DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
+ VirtualServerAddress virtualServerAddress = new VirtualServerAddress(hostname, port, workerId);
+ distributedStagePlan.setServer(virtualServerAddress);
+ distributedStagePlan.setStageRoot(stageRoot);
+ distributedStagePlan.setStageMetadata(stageMetadata);
+ distributedStagePlans.add(distributedStagePlan);
+ }
+ return distributedStagePlans;
+ }
+
private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
StageMetadata.Builder builder = new StageMetadata.Builder();
List<WorkerMetadata> workerMetadataList = new ArrayList<>();
@@ -119,12 +145,16 @@ public class QueryPlanSerDeUtils {
return mailboxMetadata;
}
- private static Worker.StageMetadata toProtoStageMetadata(StageMetadata stageMetadata) {
+ private static Worker.StageMetadata toProtoStageMetadata(DispatchablePlanFragment planFragment,
+ QueryServerInstance queryServerInstance, List<Integer> workerIds) {
Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
- for (WorkerMetadata workerMetadata : stageMetadata.getWorkerMetadataList()) {
+ for (WorkerMetadata workerMetadata : planFragment.getWorkerMetadataList()) {
builder.addWorkerMetadata(toProtoWorkerMetadata(workerMetadata));
}
- builder.putAllCustomProperty(stageMetadata.getCustomProperties());
+ builder.putAllCustomProperty(planFragment.getCustomProperties());
+ builder.setServerAddress(String.format("%s:%d", queryServerInstance.getHostname(),
+ queryServerInstance.getQueryMailboxPort()));
+ builder.addAllWorkerIds(workerIds);
return builder.build();
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 1c0f7168ff..87bf3302b5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -22,9 +22,9 @@ import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.spi.config.table.TableType;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index d88b87220e..126273d310 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -22,10 +22,13 @@ import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import java.util.List;
import java.util.Map;
-import org.apache.pinot.common.exception.QueryException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -45,12 +48,19 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private static final int MAX_INBOUND_MESSAGE_SIZE = 64 * 1024 * 1024;
private final int _port;
- private Server _server = null;
private final QueryRunner _queryRunner;
+ // query submission service is only used for plan submission for now.
+ // TODO: with complex query submission logic we should allow asynchronous query submission return instead of
+ // directly return from submission response observer.
+ private final ExecutorService _querySubmissionExecutorService;
+
+ private Server _server = null;
public QueryServer(int port, QueryRunner queryRunner) {
_port = port;
_queryRunner = queryRunner;
+ _querySubmissionExecutorService = Executors.newCachedThreadPool(
+ new NamedThreadFactory("query_submission_executor_on_" + _port + "_port"));
}
public void start() {
@@ -83,32 +93,30 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
@Override
public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
// Deserialize the request
- DistributedStagePlan distributedStagePlan;
+ List<DistributedStagePlan> distributedStagePlans;
Map<String, String> requestMetadataMap;
- long requestId = -1;
+ requestMetadataMap = request.getMetadataMap();
+ long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
try {
- distributedStagePlan = QueryPlanSerDeUtils.deserialize(request.getStagePlan());
- requestMetadataMap = request.getMetadataMap();
- requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+ distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request);
} catch (Exception e) {
LOGGER.error("Caught exception while deserializing the request: {}, payload: {}", requestId, request, e);
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
return;
}
-
- try {
- _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
- responseObserver.onNext(Worker.QueryResponse.newBuilder()
- .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build());
- responseObserver.onCompleted();
- } catch (Throwable t) {
- LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId,
- distributedStagePlan.getStageId(), t);
- responseObserver.onNext(Worker.QueryResponse.newBuilder()
- .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR, QueryException.getTruncatedStackTrace(t))
- .build());
- responseObserver.onCompleted();
- }
+ // TODO: allow thrown exception to return back to broker in asynchronous manner.
+ distributedStagePlans.forEach(distributedStagePlan -> _querySubmissionExecutorService.submit(() -> {
+ try {
+ _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+ } catch (Throwable t) {
+ LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId,
+ distributedStagePlan.getStageId(), t);
+ }
+ })
+ );
+ responseObserver.onNext(Worker.QueryResponse.newBuilder()
+ .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build());
+ responseObserver.onCompleted();
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index aa700a2e51..d7eb25a944 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -45,6 +45,7 @@ import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
@@ -56,8 +57,8 @@ import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.utils.ByteArray;
@@ -139,22 +140,21 @@ public class QueryDispatcher {
for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
: dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet()) {
QueryServerInstance queryServerInstance = queryServerEntry.getKey();
- for (int workerId : queryServerEntry.getValue()) {
- String host = queryServerInstance.getHostname();
- int servicePort = queryServerInstance.getQueryServicePort();
- int mailboxPort = queryServerInstance.getQueryMailboxPort();
- VirtualServerAddress virtualServerAddress = new VirtualServerAddress(host, mailboxPort, workerId);
- DispatchClient client = getOrCreateDispatchClient(host, servicePort);
- dispatchCalls++;
- int finalStageId = stageId;
- _executorService.submit(() -> client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
- QueryPlanSerDeUtils.serialize(
- constructDistributedStagePlan(dispatchableSubPlan, finalStageId, virtualServerAddress)))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs))
- .putAllMetadata(queryOptions).build(), finalStageId, queryServerInstance, deadline,
- dispatchCallbacks::offer));
- }
+ Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder();
+ String host = queryServerInstance.getHostname();
+ int servicePort = queryServerInstance.getQueryServicePort();
+ queryRequestBuilder.addStagePlan(
+ QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, queryServerInstance,
+ queryServerEntry.getValue()));
+ dispatchCalls++;
+ Worker.QueryRequest queryRequest =
+ queryRequestBuilder.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId))
+ .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs))
+ .putAllMetadata(queryOptions).build();
+ DispatchClient client = getOrCreateDispatchClient(host, servicePort);
+ int finalStageId = stageId;
+ _executorService.submit(() -> client.submit(queryRequest, finalStageId, queryServerInstance, deadline,
+ dispatchCallbacks::offer));
}
}
}
@@ -190,14 +190,14 @@ public class QueryDispatcher {
public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatchableSubPlan, int reduceStageId,
long timeoutMs,
MailboxService mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean traceEnabled) {
- MailboxReceiveNode reduceNode =
- (MailboxReceiveNode) dispatchableSubPlan.getQueryStageList().get(reduceStageId).getPlanFragment()
- .getFragmentRoot();
+ DispatchablePlanFragment reduceStagePlanFragment = dispatchableSubPlan.getQueryStageList().get(reduceStageId);
+ MailboxReceiveNode reduceNode = (MailboxReceiveNode) reduceStagePlanFragment.getPlanFragment().getFragmentRoot();
VirtualServerAddress server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0);
OpChainExecutionContext context =
new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs,
System.currentTimeMillis() + timeoutMs,
- dispatchableSubPlan.getQueryStageList().get(reduceStageId).toStageMetadata(),
+ new StageMetadata.Builder().setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList())
+ .addCustomProperties(reduceStagePlanFragment.getCustomProperties()).build(),
traceEnabled);
MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(context, reduceNode.getSenderStageId());
List<DataBlock> resultDataBlocks =
@@ -207,14 +207,6 @@ public class QueryDispatcher {
dispatchableSubPlan.getQueryStageList().get(0).getPlanFragment().getFragmentRoot().getDataSchema());
}
- @VisibleForTesting
- public static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
- int stageId, VirtualServerAddress serverAddress) {
- return new DistributedStagePlan(stageId, serverAddress,
- dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(),
- dispatchableSubPlan.getQueryStageList().get(stageId).toStageMetadata());
- }
-
private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs,
@Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap,
DispatchableSubPlan dispatchableSubPlan,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 32164207d0..190bac2573 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -52,6 +52,7 @@ import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.data.FieldSpec;
@@ -134,13 +135,22 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
for (Map.Entry<QueryServerInstance, List<Integer>> entry : serverInstanceToWorkerIdMap.entrySet()) {
QueryServerInstance server = entry.getKey();
for (int workerId : entry.getValue()) {
- DistributedStagePlan distributedStagePlan = QueryDispatcher.constructDistributedStagePlan(
+ DistributedStagePlan distributedStagePlan = constructDistributedStagePlan(
dispatchableSubPlan, stageId, new VirtualServerAddress(server, workerId));
_servers.get(server).processQuery(distributedStagePlan, requestMetadataMap);
}
}
}
+ protected static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
+ int stageId, VirtualServerAddress serverAddress) {
+ return new DistributedStagePlan(stageId, serverAddress,
+ dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(),
+ new StageMetadata.Builder().setWorkerMetadataList(
+ dispatchableSubPlan.getQueryStageList().get(stageId).getWorkerMetadataList())
+ .addCustomProperties(dispatchableSubPlan.getQueryStageList().get(stageId).getCustomProperties()).build());
+ }
+
protected List<Object[]> queryH2(String sql)
throws Exception {
int firstSemi = sql.indexOf(';');
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index bda313fa73..bcec439843 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -31,12 +31,12 @@ import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 774393945e..ade517f1a1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -23,13 +23,13 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 01f3dc8715..4af33067fa 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -42,13 +42,13 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.spi.data.FieldSpec;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 4c2ebc5d00..7ed0c2469a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -35,12 +35,12 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index d9803acc17..ca24f97043 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.service;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -40,12 +41,10 @@ import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
-import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.EqualityUtils;
@@ -124,7 +123,9 @@ public class QueryServerTest extends QueryTestSet {
DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
- StageMetadata stageMetadata = dispatchablePlanFragment.toStageMetadata();
+ StageMetadata stageMetadata = new StageMetadata.Builder()
+ .setWorkerMetadataList(dispatchablePlanFragment.getWorkerMetadataList())
+ .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
// ensure mock query runner received correctly deserialized payload.
QueryRunner mockRunner =
@@ -230,9 +231,8 @@ public class QueryServerTest extends QueryTestSet {
QueryServerInstance serverInstance = serverInstanceToWorkerIdMap.keySet().iterator().next();
int workerId = serverInstanceToWorkerIdMap.get(serverInstance).get(0);
- return Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(
- QueryDispatcher.constructDistributedStagePlan(dispatchableSubPlan, stageId,
- new VirtualServerAddress(serverInstance, workerId))))
+ return Worker.QueryRequest.newBuilder().addStagePlan(
+ QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, serverInstance, ImmutableList.of(workerId)))
// the default configurations that must exist.
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org