You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/08 23:32:04 UTC
(pinot) branch master updated: [Multi-stage] Optimize mailbox info in query plan (#12382)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2d41b3806e [Multi-stage] Optimize mailbox info in query plan (#12382)
2d41b3806e is described below
commit 2d41b3806e14b4d9822c6eddfa57acf613356db2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 8 15:31:58 2024 -0800
[Multi-stage] Optimize mailbox info in query plan (#12382)
---
pinot-common/src/main/proto/worker.proto | 25 ++-
.../explain/PhysicalExplainPlanVisitor.java | 35 +--
.../planner/physical/DispatchablePlanContext.java | 8 +-
.../planner/physical/DispatchablePlanMetadata.java | 8 +-
.../planner/physical/MailboxAssignmentVisitor.java | 124 ++++++-----
.../query/planner/physical/MailboxIdUtils.java | 43 ++--
.../{MailboxMetadata.java => MailboxInfo.java} | 39 ++--
.../apache/pinot/query/routing/MailboxInfos.java | 35 ++-
.../pinot/query/routing}/QueryPlanSerDeUtils.java | 64 +++---
.../apache/pinot/query/routing/RoutingInfo.java | 38 ++--
.../pinot/query/routing/SharedMailboxInfos.java | 38 ++--
.../apache/pinot/query/routing}/StageMetadata.java | 11 +-
.../org/apache/pinot/query/routing}/StagePlan.java | 10 +-
.../apache/pinot/query/routing/WorkerMetadata.java | 24 +--
.../apache/pinot/query/QueryCompilationTest.java | 239 ++++++++++-----------
.../apache/pinot/query/runtime/QueryRunner.java | 49 +++--
.../operator/BaseMailboxReceiveOperator.java | 10 +-
.../runtime/operator/MailboxSendOperator.java | 26 +--
.../runtime/plan/OpChainExecutionContext.java | 18 +-
.../plan/pipeline/PipelineBreakerExecutor.java | 6 +-
.../plan/server/ServerPlanRequestContext.java | 2 +-
.../plan/server/ServerPlanRequestUtils.java | 4 +-
.../query/service/dispatch/QueryDispatcher.java | 16 +-
.../pinot/query/service/server/QueryServer.java | 10 +-
.../apache/pinot/query/QueryServerEnclosure.java | 2 +-
.../executor/OpChainSchedulerServiceTest.java | 17 +-
.../operator/MailboxReceiveOperatorTest.java | 29 ++-
.../runtime/operator/MailboxSendOperatorTest.java | 21 +-
.../pinot/query/runtime/operator/OpChainTest.java | 58 ++---
.../query/runtime/operator/OperatorTestUtil.java | 17 +-
.../operator/SortedMailboxReceiveOperatorTest.java | 29 ++-
.../plan/pipeline/PipelineBreakerExecutorTest.java | 33 +--
.../query/runtime/queries/QueryRunnerTestBase.java | 10 +-
.../query/service/server/QueryServerTest.java | 18 +-
34 files changed, 543 insertions(+), 573 deletions(-)
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index b7e492fcc5..8ad277c96d 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -49,25 +49,30 @@ message QueryResponse {
}
message StagePlan {
- int32 stageId = 1;
- bytes rootNode = 2; // Serialized StageNode
- StageMetadata stageMetadata = 3;
+ bytes rootNode = 1; // Serialized StageNode
+ StageMetadata stageMetadata = 2;
}
message StageMetadata {
- repeated WorkerMetadata workerMetadata = 1;
- bytes customProperty = 2; // Serialized Properties
+ int32 stageId = 1;
+ repeated WorkerMetadata workerMetadata = 2;
+ bytes customProperty = 3; // Serialized Properties
}
message WorkerMetadata {
- string virtualAddress = 1;
- map<int32, MailboxMetadata> mailboxMetadata = 2;
+ int32 workedId = 1;
+ map<int32, bytes> mailboxInfos = 2; // Stage id to serialized MailboxInfos
map<string, string> customProperty = 3;
}
-message MailboxMetadata {
- repeated string mailboxId = 1;
- repeated string virtualAddress = 2;
+message MailboxInfos {
+ repeated MailboxInfo mailboxInfo = 1;
+}
+
+message MailboxInfo {
+ string hostname = 1;
+ int32 port = 2;
+ repeated int32 workerId = 3;
}
message Properties {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
index ea9bef1139..61dd1a23f0 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
@@ -18,12 +18,10 @@
*/
package org.apache.pinot.query.planner.explain;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
@@ -41,8 +39,8 @@ import org.apache.pinot.query.planner.plannode.SortNode;
import org.apache.pinot.query.planner.plannode.TableScanNode;
import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.routing.MailboxInfo;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
/**
@@ -214,14 +212,13 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
appendInfo(node, context);
int receiverStageId = node.getReceiverStageId();
- List<VirtualServerAddress> serverAddressList =
+ List<MailboxInfo> receiverMailboxInfos =
_dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId()).getWorkerMetadataList()
- .get(context._workerId).getMailboxMetadataMap().get(receiverStageId).getVirtualAddresses();
- List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
+ .get(context._workerId).getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
context._builder.append("->");
- String receivers = serverInstanceToWorkerIdList.stream()
- .map(s -> "[" + receiverStageId + "]@" + s)
- .collect(Collectors.joining(",", "{", "}"));
+ // Sort to ensure print order
+ String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort))
+ .map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}"));
return context._builder.append(receivers);
}
@@ -276,24 +273,4 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
);
}
}
-
- public static List<String> stringifyVirtualServerAddresses(List<VirtualServerAddress> serverAddressList) {
- // using tree map to ensure print order.
- Map<QueryServerInstance, List<Integer>> serverToWorkerIdMap = new TreeMap<>(
- Comparator.comparing(QueryServerInstance::toString));
- for (VirtualServerAddress serverAddress : serverAddressList) {
- QueryServerInstance server = new QueryServerInstance(serverAddress.hostname(), serverAddress.port(), -1);
- List<Integer> workerIds = serverToWorkerIdMap.getOrDefault(server, new ArrayList<>());
- workerIds.add(serverAddress.workerId());
- serverToWorkerIdMap.put(server, workerIds);
- }
- return serverToWorkerIdMap.entrySet().stream()
- .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
- .collect(Collectors.toList());
- }
-
- public static String stringifyQueryServerInstanceToWorkerIdsEntry(Map.Entry<QueryServerInstance, List<Integer>> e) {
- QueryServerInstance server = e.getKey();
- return server.getHostname() + ":" + server.getQueryServicePort() + "|" + e.getValue();
- }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 22744dda0e..6d421e90d9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -29,9 +29,8 @@ import org.apache.calcite.util.Pair;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -100,7 +99,7 @@ public class DispatchablePlanContext {
dispatchablePlanMetadata.getWorkerIdToServerInstanceMap();
Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
dispatchablePlanMetadata.getWorkerIdToSegmentsMap();
- Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailboxesMap =
+ Map<Integer, Map<Integer, MailboxInfos>> workerIdToMailboxesMap =
dispatchablePlanMetadata.getWorkerIdToMailboxesMap();
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdsMap = new HashMap<>();
WorkerMetadata[] workerMetadataArray = new WorkerMetadata[workerIdToServerInstanceMap.size()];
@@ -108,8 +107,7 @@ public class DispatchablePlanContext {
int workerId = serverEntry.getKey();
QueryServerInstance queryServerInstance = serverEntry.getValue();
serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId);
- WorkerMetadata workerMetadata = new WorkerMetadata(new VirtualServerAddress(queryServerInstance, workerId),
- workerIdToMailboxesMap.get(workerId));
+ WorkerMetadata workerMetadata = new WorkerMetadata(workerId, workerIdToMailboxesMap.get(workerId));
if (workerIdToSegmentsMap != null) {
workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 5415ca7019..db68010103 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -28,7 +28,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.routing.QueryServerInstance;
@@ -54,6 +54,8 @@ public class DispatchablePlanMetadata implements Serializable {
// info from PlanNode that requires singleton (e.g. SortNode/AggregateNode)
private boolean _requiresSingletonInstance;
+ // TODO: Change the following maps to lists
+
// --------------------------------------------------------------------------
// Fields extracted with {@link PinotDispatchPlanner}
// --------------------------------------------------------------------------
@@ -67,7 +69,7 @@ public class DispatchablePlanMetadata implements Serializable {
// used for build mailboxes between workers.
// workerId -> {planFragmentId -> mailbox list}
- private final Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
+ private final Map<Integer, Map<Integer, MailboxInfos>> _workerIdToMailboxesMap;
// used for tracking unavailable segments from routing table, then assemble missing segments exception.
private final Map<String, Set<String>> _tableToUnavailableSegmentsMap;
@@ -123,7 +125,7 @@ public class DispatchablePlanMetadata implements Serializable {
_workerIdToSegmentsMap = workerIdToSegmentsMap;
}
- public Map<Integer, Map<Integer, MailboxMetadata>> getWorkerIdToMailboxesMap() {
+ public Map<Integer, Map<Integer, MailboxInfos>> getWorkerIdToMailboxesMap() {
return _workerIdToMailboxesMap;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 421e7bbc9c..7bdcdb1343 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -19,18 +19,20 @@
package org.apache.pinot.query.planner.physical;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<Void, DispatchablePlanContext> {
@@ -47,8 +49,8 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverFragmentId);
Map<Integer, QueryServerInstance> senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap();
Map<Integer, QueryServerInstance> receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap();
- Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap();
- Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap();
+ Map<Integer, Map<Integer, MailboxInfos>> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap();
+ Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap();
int numSenders = senderServerMap.size();
int numReceivers = receiverServerMap.size();
@@ -63,11 +65,11 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
Preconditions.checkState(senderServer.equals(receiverServer),
"Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s",
workerId, senderServer, receiverServer);
- MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList(
- MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)),
- Collections.singletonList(new VirtualServerAddress(senderServer, workerId)));
- senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
- receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
+ MailboxInfos mailboxInfos = new SharedMailboxInfos(
+ new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(),
+ ImmutableList.of(workerId)));
+ senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxInfos);
+ receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxInfos);
}
} else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) {
// - direct exchange possible:
@@ -78,82 +80,77 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
if (partitionParallelism == 1) {
// 1-to-1 mapping
for (int workerId = 0; workerId < numSenders; workerId++) {
- String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
- MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)));
- MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)));
+ QueryServerInstance senderServer = senderServerMap.get(workerId);
+ QueryServerInstance receiverServer = receiverServerMap.get(workerId);
+ List<Integer> workerIds = ImmutableList.of(workerId);
+ MailboxInfos senderMailboxInfos;
+ MailboxInfos receiverMailboxInfos;
+ if (senderServer.equals(receiverServer)) {
+ senderMailboxInfos = new SharedMailboxInfos(
+ new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds));
+ receiverMailboxInfos = senderMailboxInfos;
+ } else {
+ senderMailboxInfos = new MailboxInfos(
+ new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds));
+ receiverMailboxInfos = new MailboxInfos(
+ new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds));
+ }
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
- .put(receiverFragmentId, serderMailboxMetadata);
+ .put(receiverFragmentId, receiverMailboxInfos);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
- .put(senderFragmentId, receiverMailboxMetadata);
+ .put(senderFragmentId, senderMailboxInfos);
}
} else {
// 1-to-<partition_parallelism> mapping
int receiverWorkerId = 0;
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
- VirtualServerAddress senderAddress =
- new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
- List<String> receivingMailboxIds = new ArrayList<>(partitionParallelism);
- List<VirtualServerAddress> receivingAddresses = new ArrayList<>(partitionParallelism);
- MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
- senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
- .put(receiverFragmentId, senderMailboxMetadata);
+ QueryServerInstance senderServer = senderServerMap.get(senderWorkerId);
+ QueryServerInstance receiverServer = receiverServerMap.get(receiverWorkerId);
+ List<Integer> receiverWorkerIds = new ArrayList<>(partitionParallelism);
+ senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()).put(receiverFragmentId,
+ new MailboxInfos(new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(),
+ receiverWorkerIds)));
+ MailboxInfos senderMailboxInfos = new SharedMailboxInfos(
+ new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(),
+ ImmutableList.of(senderWorkerId)));
for (int i = 0; i < partitionParallelism; i++) {
- String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
- receiverWorkerId);
- receivingMailboxIds.add(mailboxId);
- receivingAddresses.add(
- new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
-
- MailboxMetadata receiverMailboxMetadata =
- receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
- .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
- receiverMailboxMetadata.getMailboxIds().add(mailboxId);
- receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
-
+ receiverWorkerIds.add(receiverWorkerId);
+ receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
+ .put(senderFragmentId, senderMailboxInfos);
receiverWorkerId++;
}
}
}
} else {
// For other exchange types, send the data to all the instances in the receiver fragment
- // NOTE: Keep the receiver worker id sequential in the senderMailboxMetadata so that the partitionId aligns with
- // the workerId. It is useful for JOIN query when only left table is partitioned.
// TODO: Add support for more exchange types
+ List<MailboxInfo> receiverMailboxInfoList = getMailboxInfos(receiverServerMap);
+ MailboxInfos receiverMailboxInfos = numSenders > 1 ? new SharedMailboxInfos(receiverMailboxInfoList)
+ : new MailboxInfos(receiverMailboxInfoList);
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
- VirtualServerAddress senderAddress =
- new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
- List<String> receivingMailboxIds = new ArrayList<>(numReceivers);
- List<VirtualServerAddress> receivingAddresses = new ArrayList<>(numReceivers);
- MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
- .put(receiverFragmentId, senderMailboxMetadata);
- for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
- String mailboxId =
- MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId);
- receivingMailboxIds.add(mailboxId);
- receivingAddresses.add(new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
-
- MailboxMetadata receiverMailboxMetadata =
- receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
- .computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
- receiverMailboxMetadata.getMailboxIds().add(mailboxId);
- receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
- }
+ .put(receiverFragmentId, receiverMailboxInfos);
+ }
+ List<MailboxInfo> senderMailboxInfoList = getMailboxInfos(senderServerMap);
+ MailboxInfos senderMailboxInfos =
+ numReceivers > 1 ? new SharedMailboxInfos(senderMailboxInfoList) : new MailboxInfos(senderMailboxInfoList);
+ for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
+ receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
+ .put(senderFragmentId, senderMailboxInfos);
}
}
}
return null;
}
- private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, DispatchablePlanMetadata receiver) {
+ private static boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender,
+ DispatchablePlanMetadata receiver) {
Map<Integer, QueryServerInstance> senderServerMap = sender.getWorkerIdToServerInstanceMap();
Map<Integer, QueryServerInstance> receiverServerMap = receiver.getWorkerIdToServerInstanceMap();
int numSenders = senderServerMap.size();
int numReceivers = receiverServerMap.size();
- if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
+ if (!sender.getScannedTables().isEmpty() && receiver.getScannedTables().isEmpty()) {
// leaf-to-intermediate condition
return numSenders * sender.getPartitionParallelism() == numReceivers && sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
@@ -163,4 +160,15 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
.equalsIgnoreCase(receiver.getPartitionFunction());
}
}
+
+ private static List<MailboxInfo> getMailboxInfos(Map<Integer, QueryServerInstance> workerIdToServerMap) {
+ Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = new HashMap<>();
+ int numServers = workerIdToServerMap.size();
+ for (int workerId = 0; workerId < numServers; workerId++) {
+ serverToWorkerIdsMap.computeIfAbsent(workerIdToServerMap.get(workerId), k -> new ArrayList<>()).add(workerId);
+ }
+ return serverToWorkerIdsMap.entrySet().stream()
+ .map(e -> new MailboxInfo(e.getKey().getHostname(), e.getKey().getQueryMailboxPort(), e.getValue()))
+ .collect(Collectors.toList());
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
index 32c7d3197a..f81e243dc7 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
@@ -19,9 +19,10 @@
package org.apache.pinot.query.planner.physical;
import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.RoutingInfo;
public class MailboxIdUtils {
@@ -30,23 +31,35 @@ public class MailboxIdUtils {
public static final char SEPARATOR = '|';
- public static String toPlanMailboxId(int senderStageId, int senderWorkerId, int receiverStageId,
+ @VisibleForTesting
+ public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
int receiverWorkerId) {
- return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR + receiverStageId + SEPARATOR
- + receiverWorkerId;
- }
-
- public static String toMailboxId(long requestId, String planMailboxId) {
- return Long.toString(requestId) + SEPARATOR + planMailboxId;
+ return Long.toString(requestId) + SEPARATOR + senderStageId + SEPARATOR + senderWorkerId + SEPARATOR
+ + receiverStageId + SEPARATOR + receiverWorkerId;
}
- public static List<String> toMailboxIds(long requestId, MailboxMetadata mailboxMetadata) {
- return mailboxMetadata.getMailboxIds().stream().map(v -> toMailboxId(requestId, v)).collect(Collectors.toList());
+ public static List<RoutingInfo> toRoutingInfos(long requestId, int senderStageId, int senderWorkerId,
+ int receiverStageId, List<MailboxInfo> receiverMailboxInfos) {
+ List<RoutingInfo> routingInfos = new ArrayList<>();
+ for (MailboxInfo mailboxInfo : receiverMailboxInfos) {
+ String hostname = mailboxInfo.getHostname();
+ int port = mailboxInfo.getPort();
+ for (int receiverWorkerId : mailboxInfo.getWorkerIds()) {
+ routingInfos.add(new RoutingInfo(hostname, port,
+ toMailboxId(requestId, senderStageId, senderWorkerId, receiverStageId, receiverWorkerId)));
+ }
+ }
+ return routingInfos;
}
- @VisibleForTesting
- public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
- int receiverWorkerId) {
- return toMailboxId(requestId, toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
+ public static List<String> toMailboxIds(long requestId, int senderStageId, List<MailboxInfo> senderMailboxInfos,
+ int receiverStageId, int receiverWorkerId) {
+ List<String> mailboxIds = new ArrayList<>();
+ for (MailboxInfo mailboxInfo : senderMailboxInfos) {
+ for (int senderWorkerId : mailboxInfo.getWorkerIds()) {
+ mailboxIds.add(toMailboxId(requestId, senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
+ }
+ }
+ return mailboxIds;
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java
similarity index 50%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java
index b3484d1a7b..b7ad8c5f61 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfo.java
@@ -18,42 +18,37 @@
*/
package org.apache.pinot.query.routing;
-import java.util.ArrayList;
import java.util.List;
/**
- * {@code MailboxMetadata} wraps around a list of mailboxes information from/to one connected stage.
- * It contains the following information:
- * <ul>
- * <li>MailboxId: the unique id of the mailbox</li>
- * <li>VirtualAddress: the virtual address of the mailbox</li>
- * </ul>
+ * {@code MailboxInfo} wraps the mailbox information from/to one connected server.
*/
-public class MailboxMetadata {
- private final List<String> _mailboxIds;
- private final List<VirtualServerAddress> _virtualAddresses;
-
- public MailboxMetadata() {
- _mailboxIds = new ArrayList<>();
- _virtualAddresses = new ArrayList<>();
+public class MailboxInfo {
+ private final String _hostname;
+ private final int _port;
+ private final List<Integer> _workerIds;
+
+ public MailboxInfo(String hostname, int port, List<Integer> workerIds) {
+ _hostname = hostname;
+ _port = port;
+ _workerIds = workerIds;
}
- public MailboxMetadata(List<String> mailboxIds, List<VirtualServerAddress> virtualAddresses) {
- _mailboxIds = mailboxIds;
- _virtualAddresses = virtualAddresses;
+ public String getHostname() {
+ return _hostname;
}
- public List<String> getMailboxIds() {
- return _mailboxIds;
+ public int getPort() {
+ return _port;
}
- public List<VirtualServerAddress> getVirtualAddresses() {
- return _virtualAddresses;
+ public List<Integer> getWorkerIds() {
+ return _workerIds;
}
@Override
public String toString() {
- return _mailboxIds + "@" + _virtualAddresses;
+ return _hostname + ":" + _port + "|" + _workerIds;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java
similarity index 50%
copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java
index a45b48c5de..76c6e94224 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxInfos.java
@@ -16,36 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
-import org.apache.pinot.query.planner.plannode.PlanNode;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.util.List;
-/**
- * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
- *
- * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
- */
-public class StagePlan {
- private final int _stageId;
- private final PlanNode _rootNode;
- private final StageMetadata _stageMetadata;
+public class MailboxInfos {
+ private final List<MailboxInfo> _mailboxInfos;
- public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
- _stageId = stageId;
- _rootNode = rootNode;
- _stageMetadata = stageMetadata;
+ public MailboxInfos(List<MailboxInfo> mailboxInfos) {
+ _mailboxInfos = mailboxInfos;
}
- public int getStageId() {
- return _stageId;
+ public MailboxInfos(MailboxInfo mailboxInfo) {
+ _mailboxInfos = ImmutableList.of(mailboxInfo);
}
- public PlanNode getRootNode() {
- return _rootNode;
+ public List<MailboxInfo> getMailboxInfos() {
+ return _mailboxInfos;
}
- public StageMetadata getStageMetadata() {
- return _stageMetadata;
+ public ByteString toProtoBytes() {
+ return QueryPlanSerDeUtils.toProtoMailboxInfos(_mailboxInfos).toByteString();
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java
similarity index 53%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java
index fbfb9487b4..d692d634b4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/QueryPlanSerDeUtils.java
@@ -16,10 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.plan.serde;
+package org.apache.pinot.query.routing;
+import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -27,11 +29,6 @@ import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
/**
@@ -46,30 +43,35 @@ public class QueryPlanSerDeUtils {
AbstractPlanNode rootNode =
StageNodeSerDeUtils.deserializeStageNode(Plan.StageNode.parseFrom(protoStagePlan.getRootNode()));
StageMetadata stageMetadata = fromProtoStageMetadata(protoStagePlan.getStageMetadata());
- return new StagePlan(protoStagePlan.getStageId(), rootNode, stageMetadata);
+ return new StagePlan(rootNode, stageMetadata);
}
private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata)
throws InvalidProtocolBufferException {
- List<WorkerMetadata> workerMetadataList =
- protoStageMetadata.getWorkerMetadataList().stream().map(QueryPlanSerDeUtils::fromProtoWorkerMetadata)
- .collect(Collectors.toList());
+ List<Worker.WorkerMetadata> protoWorkerMetadataList = protoStageMetadata.getWorkerMetadataList();
+ List<WorkerMetadata> workerMetadataList = new ArrayList<>(protoWorkerMetadataList.size());
+ for (Worker.WorkerMetadata protoWorkerMetadata : protoWorkerMetadataList) {
+ workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata));
+ }
Map<String, String> customProperties = fromProtoProperties(protoStageMetadata.getCustomProperty());
- return new StageMetadata(workerMetadataList, customProperties);
+ return new StageMetadata(protoStageMetadata.getStageId(), workerMetadataList, customProperties);
}
- private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) {
- VirtualServerAddress virtualAddress = VirtualServerAddress.parse(protoWorkerMetadata.getVirtualAddress());
- Map<Integer, MailboxMetadata> mailboxMetadataMap = protoWorkerMetadata.getMailboxMetadataMap().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> fromProtoMailbox(e.getValue())));
- return new WorkerMetadata(virtualAddress, mailboxMetadataMap, protoWorkerMetadata.getCustomPropertyMap());
+ private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata)
+ throws InvalidProtocolBufferException {
+ Map<Integer, ByteString> protoMailboxInfosMap = protoWorkerMetadata.getMailboxInfosMap();
+ Map<Integer, MailboxInfos> mailboxInfosMap = Maps.newHashMapWithExpectedSize(protoMailboxInfosMap.size());
+ for (Map.Entry<Integer, ByteString> entry : protoMailboxInfosMap.entrySet()) {
+ mailboxInfosMap.put(entry.getKey(), fromProtoMailboxInfos(entry.getValue()));
+ }
+ return new WorkerMetadata(protoWorkerMetadata.getWorkedId(), mailboxInfosMap,
+ protoWorkerMetadata.getCustomPropertyMap());
}
- private static MailboxMetadata fromProtoMailbox(Worker.MailboxMetadata protoMailboxMetadata) {
- List<VirtualServerAddress> virtualAddresses =
- protoMailboxMetadata.getVirtualAddressList().stream().map(VirtualServerAddress::parse)
- .collect(Collectors.toList());
- return new MailboxMetadata(protoMailboxMetadata.getMailboxIdList(), virtualAddresses);
+ private static MailboxInfos fromProtoMailboxInfos(ByteString protoMailboxInfos)
+ throws InvalidProtocolBufferException {
+ return new MailboxInfos(Worker.MailboxInfos.parseFrom(protoMailboxInfos).getMailboxInfoList().stream()
+ .map(v -> new MailboxInfo(v.getHostname(), v.getPort(), v.getWorkerIdList())).collect(Collectors.toList()));
}
public static Map<String, String> fromProtoProperties(ByteString protoProperties)
@@ -82,19 +84,17 @@ public class QueryPlanSerDeUtils {
}
private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata workerMetadata) {
- Map<Integer, Worker.MailboxMetadata> protoMailboxMetadataMap =
- workerMetadata.getMailboxMetadataMap().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> toProtoMailboxMetadata(e.getValue())));
- return Worker.WorkerMetadata.newBuilder().setVirtualAddress(workerMetadata.getVirtualAddress().toString())
- .putAllMailboxMetadata(protoMailboxMetadataMap).putAllCustomProperty(workerMetadata.getCustomProperties())
- .build();
+ Map<Integer, ByteString> mailboxInfosMap = workerMetadata.getMailboxInfosMap().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toProtoBytes()));
+ return Worker.WorkerMetadata.newBuilder().setWorkedId(workerMetadata.getWorkerId())
+ .putAllMailboxInfos(mailboxInfosMap).putAllCustomProperty(workerMetadata.getCustomProperties()).build();
}
- private static Worker.MailboxMetadata toProtoMailboxMetadata(MailboxMetadata mailboxMetadata) {
- List<String> virtualAddresses =
- mailboxMetadata.getVirtualAddresses().stream().map(VirtualServerAddress::toString).collect(Collectors.toList());
- return Worker.MailboxMetadata.newBuilder().addAllMailboxId(mailboxMetadata.getMailboxIds())
- .addAllVirtualAddress(virtualAddresses).build();
+ public static Worker.MailboxInfos toProtoMailboxInfos(List<MailboxInfo> mailboxInfos) {
+ List<Worker.MailboxInfo> protoMailboxInfos = mailboxInfos.stream().map(
+ v -> Worker.MailboxInfo.newBuilder().setHostname(v.getHostname()).setPort(v.getPort())
+ .addAllWorkerId(v.getWorkerIds()).build()).collect(Collectors.toList());
+ return Worker.MailboxInfos.newBuilder().addAllMailboxInfo(protoMailboxInfos).build();
}
public static ByteString toProtoProperties(Map<String, String> properties) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java
similarity index 50%
copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java
index a45b48c5de..7b92899237 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/RoutingInfo.java
@@ -16,36 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
-import org.apache.pinot.query.planner.plannode.PlanNode;
+public class RoutingInfo {
+ private final String _hostname;
+ private final int _port;
+ private final String _mailboxId;
-
-/**
- * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
- *
- * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
- */
-public class StagePlan {
- private final int _stageId;
- private final PlanNode _rootNode;
- private final StageMetadata _stageMetadata;
-
- public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
- _stageId = stageId;
- _rootNode = rootNode;
- _stageMetadata = stageMetadata;
+ public RoutingInfo(String hostname, int port, String mailboxId) {
+ _hostname = hostname;
+ _port = port;
+ _mailboxId = mailboxId;
}
- public int getStageId() {
- return _stageId;
+ public String getHostname() {
+ return _hostname;
}
- public PlanNode getRootNode() {
- return _rootNode;
+ public int getPort() {
+ return _port;
}
- public StageMetadata getStageMetadata() {
- return _stageMetadata;
+ public String getMailboxId() {
+ return _mailboxId;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java
similarity index 50%
copy from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java
index a45b48c5de..15fb1e666a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/SharedMailboxInfos.java
@@ -16,36 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
-import org.apache.pinot.query.planner.plannode.PlanNode;
+import com.google.protobuf.ByteString;
+import java.util.List;
/**
- * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
- *
- * <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
+ * {@code SharedMailboxInfos} is the shared version of the {@link MailboxInfos} which can cache the proto bytes and
+ * reduce overhead of serialization.
*/
-public class StagePlan {
- private final int _stageId;
- private final PlanNode _rootNode;
- private final StageMetadata _stageMetadata;
-
- public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
- _stageId = stageId;
- _rootNode = rootNode;
- _stageMetadata = stageMetadata;
- }
+public class SharedMailboxInfos extends MailboxInfos {
+ private ByteString _protoBytes;
- public int getStageId() {
- return _stageId;
+ public SharedMailboxInfos(List<MailboxInfo> mailboxInfos) {
+ super(mailboxInfos);
}
- public PlanNode getRootNode() {
- return _rootNode;
+ public SharedMailboxInfos(MailboxInfo mailboxInfo) {
+ super(mailboxInfo);
}
- public StageMetadata getStageMetadata() {
- return _stageMetadata;
+ @Override
+ public synchronized ByteString toProtoBytes() {
+ if (_protoBytes == null) {
+ _protoBytes = super.toProtoBytes();
+ }
+ return _protoBytes;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
similarity index 88%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
index a07a04a0b7..6eca815d68 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
@@ -16,27 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
-import org.apache.pinot.query.routing.WorkerMetadata;
/**
* {@code StageMetadata} is used to send plan fragment-level info about how to execute a stage physically.
*/
public class StageMetadata {
+ private final int _stageId;
private final List<WorkerMetadata> _workerMetadataList;
private final Map<String, String> _customProperties;
- public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+ public StageMetadata(int stageId, List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+ _stageId = stageId;
_workerMetadataList = workerMetadataList;
_customProperties = customProperties;
}
+ public int getStageId() {
+ return _stageId;
+ }
+
public List<WorkerMetadata> getWorkerMetadataList() {
return _workerMetadataList;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java
similarity index 85%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java
index a45b48c5de..7dfa00268d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StagePlan.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.routing;
import org.apache.pinot.query.planner.plannode.PlanNode;
@@ -27,20 +27,14 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
* <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
*/
public class StagePlan {
- private final int _stageId;
private final PlanNode _rootNode;
private final StageMetadata _stageMetadata;
- public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
- _stageId = stageId;
+ public StagePlan(PlanNode rootNode, StageMetadata stageMetadata) {
_rootNode = rootNode;
_stageMetadata = stageMetadata;
}
- public int getStageId() {
- return _stageId;
- }
-
public PlanNode getRootNode() {
return _rootNode;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index 3392261980..ab1226862e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -44,29 +44,29 @@ import org.apache.pinot.spi.utils.JsonUtils;
public class WorkerMetadata {
public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
- private final VirtualServerAddress _virtualAddress;
- private final Map<Integer, MailboxMetadata> _mailboxMetadataMap;
+ private final int _workerId;
+ private final Map<Integer, MailboxInfos> _mailboxInfosMap;
private final Map<String, String> _customProperties;
- public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap) {
- _virtualAddress = virtualAddress;
- _mailboxMetadataMap = mailboxMetadataMap;
+ public WorkerMetadata(int workerId, Map<Integer, MailboxInfos> mailboxInfosMap) {
+ _workerId = workerId;
+ _mailboxInfosMap = mailboxInfosMap;
_customProperties = new HashMap<>();
}
- public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap,
+ public WorkerMetadata(int workerId, Map<Integer, MailboxInfos> mailboxInfosMap,
Map<String, String> customProperties) {
- _virtualAddress = virtualAddress;
- _mailboxMetadataMap = mailboxMetadataMap;
+ _workerId = workerId;
+ _mailboxInfosMap = mailboxInfosMap;
_customProperties = customProperties;
}
- public VirtualServerAddress getVirtualAddress() {
- return _virtualAddress;
+ public int getWorkerId() {
+ return _workerId;
}
- public Map<Integer, MailboxMetadata> getMailboxMetadataMap() {
- return _mailboxMetadataMap;
+ public Map<Integer, MailboxInfos> getMailboxInfosMap() {
+ return _mailboxInfosMap;
}
public Map<String, String> getCustomProperties() {
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 20d798c9c6..4bf911c805 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -30,7 +29,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.PlannerUtils;
-import org.apache.pinot.query.planner.explain.PhysicalExplainPlanVisitor;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
@@ -40,53 +38,43 @@ import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.ProjectNode;
-import org.testng.Assert;
+import org.apache.pinot.query.routing.QueryServerInstance;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
public class QueryCompilationTest extends QueryEnvironmentTestBase {
@Test(dataProvider = "testQueryLogicalPlanDataProvider")
- public void testQueryPlanExplainLogical(String query, String digest)
- throws Exception {
+ public void testQueryPlanExplainLogical(String query, String digest) {
testQueryPlanExplain(query, digest);
}
private void testQueryPlanExplain(String query, String digest) {
- try {
- long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
- String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
- Assert.assertEquals(explainedPlan, digest);
- } catch (RuntimeException e) {
- Assert.fail("failed to explain query: " + query, e);
- }
+ long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+ String explainedPlan = _queryEnvironment.explainQuery(query, requestId);
+ assertEquals(explainedPlan, digest);
}
@Test(dataProvider = "testQueryDataProvider")
- public void testQueryPlanWithoutException(String query)
- throws Exception {
- try {
- DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
- Assert.assertNotNull(dispatchableSubPlan);
- } catch (RuntimeException e) {
- Assert.fail("failed to plan query: " + query, e);
- }
+ public void testQueryPlanWithoutException(String query) {
+ DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
+ assertNotNull(dispatchableSubPlan);
}
@Test(dataProvider = "testQueryExceptionDataProvider")
public void testQueryWithException(String query, String exceptionSnippet) {
try {
_queryEnvironment.planQuery(query);
- Assert.fail("query plan should throw exception");
+ fail("query plan should throw exception");
} catch (RuntimeException e) {
- Assert.assertTrue(e.getCause().getMessage().contains(exceptionSnippet));
+ assertTrue(e.getCause().getMessage().contains(exceptionSnippet));
}
}
- private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan dispatchableSubPlan, boolean shouldRewrite)
- throws Exception {
-
+ private static void assertGroupBySingletonAfterJoin(DispatchableSubPlan dispatchableSubPlan, boolean shouldRewrite) {
for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
if (dispatchableSubPlan.getTableNames().size() == 0 && !PlannerUtils.isRootPlanFragment(stageId)) {
PlanNode node = dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot();
@@ -95,17 +83,17 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
// JOIN is exchanged with hash distribution (data shuffle)
MailboxReceiveNode left = (MailboxReceiveNode) node.getInputs().get(0);
MailboxReceiveNode right = (MailboxReceiveNode) node.getInputs().get(1);
- Assert.assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
- Assert.assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
+ assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
+ assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
break;
}
if (node instanceof AggregateNode && node.getInputs().get(0) instanceof MailboxReceiveNode) {
// AGG is exchanged with singleton since it has already been distributed by JOIN.
MailboxReceiveNode input = (MailboxReceiveNode) node.getInputs().get(0);
if (shouldRewrite) {
- Assert.assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
+ assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
} else {
- Assert.assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
+ assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
}
break;
}
@@ -116,34 +104,40 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
}
@Test
- public void testQueryAndAssertStageContentForJoin()
- throws Exception {
+ public void testQueryAndAssertStageContentForJoin() {
String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2";
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
- Assert.assertEquals(dispatchableSubPlan.getQueryStageList().size(), 4);
-
- for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
- DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
- String tableName = dispatchablePlanFragment.getTableName();
+ List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+ int numStages = stagePlans.size();
+ assertEquals(numStages, 4);
+ for (int stageId = 0; stageId < numStages; stageId++) {
+ DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+ Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = stagePlan.getServerInstanceToWorkerIdMap();
+ int numServers = serverToWorkerIdsMap.size();
+ String tableName = stagePlan.getTableName();
if (tableName != null) {
// table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1
- Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
- .collect(Collectors.toSet()),
- tableName.equals("a") ? ImmutableList.of("localhost:1|[1]", "localhost:2|[0]")
- : ImmutableList.of("localhost:1|[0]"));
+ if (tableName.equals("a")) {
+ assertEquals(numServers, 2);
+ for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) {
+ int port = server.getQueryMailboxPort();
+ assertTrue(port == 1 || port == 2);
+ }
+ } else {
+ assertEquals(numServers, 1);
+ assertEquals(serverToWorkerIdsMap.keySet().iterator().next().getQueryMailboxPort(), 1);
+ }
} else if (!PlannerUtils.isRootPlanFragment(stageId)) {
// join stage should have both servers used.
- Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
- .collect(Collectors.toSet()),
- ImmutableSet.of("localhost:1|[1]", "localhost:2|[0]"));
+ assertEquals(numServers, 2);
+ for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) {
+ int port = server.getQueryMailboxPort();
+ assertTrue(port == 1 || port == 2);
+ }
} else {
// reduce stage should have the reducer instance.
- Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
- .collect(Collectors.toSet()),
- ImmutableSet.of("localhost:3|[0]"));
+ assertEquals(numServers, 1);
+ assertEquals(serverToWorkerIdsMap.keySet().iterator().next().getQueryMailboxPort(), 3);
}
}
}
@@ -167,24 +161,27 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
public void testQueryRoutingManagerCompilation() {
String query = "SELECT * FROM d_OFFLINE";
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
- List<DispatchablePlanFragment> tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream()
- .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList());
- Assert.assertEquals(tableScanMetadataList.size(), 1);
- Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
+ List<DispatchablePlanFragment> tableScanMetadataList =
+ dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null)
+ .collect(Collectors.toList());
+ assertEquals(tableScanMetadataList.size(), 1);
+ assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
query = "SELECT * FROM d_REALTIME";
dispatchableSubPlan = _queryEnvironment.planQuery(query);
- tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream()
- .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList());
- Assert.assertEquals(tableScanMetadataList.size(), 1);
- Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1);
+ tableScanMetadataList =
+ dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null)
+ .collect(Collectors.toList());
+ assertEquals(tableScanMetadataList.size(), 1);
+ assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1);
query = "SELECT * FROM d";
dispatchableSubPlan = _queryEnvironment.planQuery(query);
- tableScanMetadataList = dispatchableSubPlan.getQueryStageList().stream()
- .filter(stageMetadata -> stageMetadata.getTableName() != null).collect(Collectors.toList());
- Assert.assertEquals(tableScanMetadataList.size(), 1);
- Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
+ tableScanMetadataList =
+ dispatchableSubPlan.getQueryStageList().stream().filter(stageMetadata -> stageMetadata.getTableName() != null)
+ .collect(Collectors.toList());
+ assertEquals(tableScanMetadataList.size(), 1);
+ assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
}
// Test that plan query can be run as multi-thread.
@@ -232,34 +229,36 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
}
for (ArrayList<DispatchableSubPlan> plans : queryPlans.values()) {
for (DispatchableSubPlan plan : plans) {
- Assert.assertTrue(plan.equals(plans.get(0)));
+ assertTrue(plan.equals(plans.get(0)));
}
}
}
@Test
- public void testQueryWithHint()
- throws Exception {
+ public void testQueryWithHint() {
// Hinting the query to use final stage aggregation makes server directly return final result
// This is useful when data is already partitioned by col1
String query = "SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ col1, COUNT(*) FROM b GROUP BY col1";
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);
- Assert.assertEquals(dispatchableSubPlan.getQueryStageList().size(), 2);
- for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
- DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
- String tableName = dispatchablePlanFragment.getTableName();
+ List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+ int numStages = stagePlans.size();
+ assertEquals(numStages, 2);
+ for (int stageId = 0; stageId < numStages; stageId++) {
+ DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
+ Map<QueryServerInstance, List<Integer>> serverToWorkerIdsMap = stagePlan.getServerInstanceToWorkerIdMap();
+ int numServers = serverToWorkerIdsMap.size();
+ String tableName = stagePlan.getTableName();
if (tableName != null) {
// table scan stages; for tableB it should have only 1
- Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
- .collect(Collectors.toSet()),
- ImmutableList.of("localhost:1|[0]"));
+ assertEquals(numServers, 1);
+ assertEquals(stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next().getQueryMailboxPort(), 1);
} else if (!PlannerUtils.isRootPlanFragment(stageId)) {
// join stage should have both servers used.
- Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
- .collect(Collectors.toSet()),
- ImmutableList.of("localhost:1|[1]", "localhost:2|[0]"));
+ assertEquals(numServers, 2);
+ for (QueryServerInstance server : serverToWorkerIdsMap.keySet()) {
+ int port = server.getQueryMailboxPort();
+ assertTrue(port == 1 || port == 2);
+ }
}
}
}
@@ -269,105 +268,103 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
// A simple filter query with one table
String query = "Select * from a where col1 = 'a'";
List<String> tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 1);
- Assert.assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.size(), 1);
+ assertEquals(tableNames.get(0), "a");
// query with IN / NOT IN clause
- query = "SELECT COUNT(*) FROM a WHERE col1 IN (SELECT col1 FROM b) "
- + "and col1 NOT IN (SELECT col1 from c)";
+ query = "SELECT COUNT(*) FROM a WHERE col1 IN (SELECT col1 FROM b) " + "and col1 NOT IN (SELECT col1 from c)";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 3);
+ assertEquals(tableNames.size(), 3);
Collections.sort(tableNames);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
- Assert.assertEquals(tableNames.get(2), "c");
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(2), "c");
// query with JOIN clause
query = "SELECT a.col1, b.col2 FROM a JOIN b ON a.col3 = b.col3 WHERE a.col1 = 'a'";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 2);
+ assertEquals(tableNames.size(), 2);
Collections.sort(tableNames);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
// query with WHERE clause JOIN
query = "SELECT a.col1, b.col2 FROM a, b WHERE a.col3 = b.col3 AND a.col1 = 'a'";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 2);
+ assertEquals(tableNames.size(), 2);
Collections.sort(tableNames);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
// query with JOIN clause and table alias
query = "SELECT A.col1, B.col2 FROM a AS A JOIN b AS B ON A.col3 = B.col3 WHERE A.col1 = 'a'";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 2);
+ assertEquals(tableNames.size(), 2);
Collections.sort(tableNames);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
// query with UNION clause
query = "SELECT * FROM a UNION ALL SELECT * FROM b UNION ALL SELECT * FROM c";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 3);
+ assertEquals(tableNames.size(), 3);
Collections.sort(tableNames);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
- Assert.assertEquals(tableNames.get(2), "c");
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(2), "c");
// query with UNION clause and table alias
query = "SELECT * FROM (SELECT * FROM a) AS t1 UNION SELECT * FROM ( SELECT * FROM b) AS t2";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 2);
+ assertEquals(tableNames.size(), 2);
Collections.sort(tableNames);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
// query with UNION clause and table alias using WITH clause
- query = "WITH tmp1 AS (SELECT * FROM a), \n"
- + "tmp2 AS (SELECT * FROM b) \n"
+ query = "WITH tmp1 AS (SELECT * FROM a), \n" + "tmp2 AS (SELECT * FROM b) \n"
+ "SELECT * FROM tmp1 UNION ALL SELECT * FROM tmp2";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 2);
+ assertEquals(tableNames.size(), 2);
Collections.sort(tableNames);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
// query with aliases, JOIN, IN/NOT-IN, group-by
query = "with tmp as (select col1, sum(col3) as col3, count(*) from a where col1 = 'a' group by col1), "
+ "tmp2 as (select A.col1, B.col3 from b as A JOIN c AS B on A.col1 = B.col1) "
+ "select sum(col3) from tmp where col1 in (select col1 from tmp2) and col1 not in (select col1 from d)";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 4);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
- Assert.assertEquals(tableNames.get(2), "c");
- Assert.assertEquals(tableNames.get(3), "d");
+ assertEquals(tableNames.size(), 4);
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(2), "c");
+ assertEquals(tableNames.get(3), "d");
// query with aliases, JOIN, IN/NOT-IN, group-by and explain
query = "explain plan for with tmp as (select col1, sum(col3) as col3, count(*) from a where col1 = 'a' "
+ "group by col1), tmp2 as (select A.col1, B.col3 from b as A JOIN c AS B on A.col1 = B.col1) "
+ "select sum(col3) from tmp where col1 in (select col1 from tmp2) and col1 not in (select col1 from d)";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 4);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
- Assert.assertEquals(tableNames.get(2), "c");
- Assert.assertEquals(tableNames.get(3), "d");
+ assertEquals(tableNames.size(), 4);
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.get(2), "c");
+ assertEquals(tableNames.get(3), "d");
// lateral join query
query = "EXPLAIN PLAN FOR SELECT a.col1, newb.sum_col3 FROM a JOIN LATERAL "
+ "(SELECT SUM(col3) as sum_col3 FROM b WHERE col2 = a.col2) AS newb ON TRUE";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 2);
- Assert.assertEquals(tableNames.get(0), "a");
- Assert.assertEquals(tableNames.get(1), "b");
+ assertEquals(tableNames.size(), 2);
+ assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.get(1), "b");
// test for self join queries
query = "SELECT a.col1 FROM a JOIN(SELECT col2 FROM a) as self ON a.col1=self.col2 ";
tableNames = _queryEnvironment.getTableNamesForQuery(query);
- Assert.assertEquals(tableNames.size(), 1);
- Assert.assertEquals(tableNames.get(0), "a");
+ assertEquals(tableNames.size(), 1);
+ assertEquals(tableNames.get(0), "a");
}
// --------------------------------------------------------------------------
@@ -375,7 +372,7 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
// --------------------------------------------------------------------------
private static void assertNodeTypeNotIn(PlanNode node, List<Class<? extends AbstractPlanNode>> bannedNodeType) {
- Assert.assertFalse(isOneOf(bannedNodeType, node));
+ assertFalse(isOneOf(bannedNodeType, node));
for (PlanNode child : node.getInputs()) {
assertNodeTypeNotIn(child, bannedNodeType);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 4796383a0b..fe078e87c6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -34,8 +34,10 @@ import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.RoutingInfo;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
@@ -43,8 +45,6 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
@@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory;
public class QueryRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
+ private String _hostname;
+ private int _port;
private HelixManager _helixManager;
private ServerMetrics _serverMetrics;
@@ -88,15 +90,16 @@ public class QueryRunner {
*/
public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager,
ServerMetrics serverMetrics) {
- _helixManager = helixManager;
- _serverMetrics = serverMetrics;
-
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (hostname.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
hostname = hostname.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
}
int port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
+ _hostname = hostname;
+ _port = port;
+ _helixManager = helixManager;
+ _serverMetrics = serverMetrics;
// TODO: Consider using separate config for intermediate stage and leaf stage
String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
@@ -163,25 +166,25 @@ public class QueryRunner {
// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+ int stageId = stageMetadata.getStageId();
LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId,
- stagePlan.getStageId(), errorBlock.getExceptions());
+ stageId, errorBlock.getExceptions());
int receiverStageId = ((MailboxSendNode) stagePlan.getRootNode()).getReceiverStageId();
- MailboxMetadata mailboxMetadata = workerMetadata.getMailboxMetadataMap().get(receiverStageId);
- List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
- List<VirtualServerAddress> virtualAddresses = mailboxMetadata.getVirtualAddresses();
- int numMailboxes = mailboxIds.size();
- for (int i = 0; i < numMailboxes; i++) {
- String mailboxId = mailboxIds.get(i);
- VirtualServerAddress virtualAddress = virtualAddresses.get(i);
+ List<MailboxInfo> receiverMailboxInfos =
+ workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+ List<RoutingInfo> routingInfos =
+ MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId,
+ receiverMailboxInfos);
+ for (RoutingInfo routingInfo : routingInfos) {
try {
- _mailboxService.getSendingMailbox(virtualAddress.hostname(), virtualAddress.port(), mailboxId, deadlineMs)
- .send(errorBlock);
+ _mailboxService.getSendingMailbox(routingInfo.getHostname(), routingInfo.getPort(),
+ routingInfo.getMailboxId(), deadlineMs).send(errorBlock);
} catch (TimeoutException e) {
- LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxId, requestId,
- stagePlan.getStageId(), e);
+ LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}",
+ routingInfo.getMailboxId(), requestId, stageId, e);
} catch (Exception e) {
- LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", mailboxId,
- requestId, stagePlan.getStageId(), e);
+ LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}",
+ routingInfo.getMailboxId(), requestId, stageId, e);
}
}
return;
@@ -189,8 +192,8 @@ public class QueryRunner {
// run OpChain
OpChainExecutionContext executionContext =
- new OpChainExecutionContext(_mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
- stageMetadata, workerMetadata, pipelineBreakerResult);
+ new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, opChainMetadata, stageMetadata,
+ workerMetadata, pipelineBreakerResult);
OpChain opChain;
if (workerMetadata.isLeafStageWorker()) {
opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics,
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 37903c2f72..808caba04e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.utils.AsyncStream;
import org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer;
@@ -58,9 +58,11 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
_exchangeType = exchangeType;
long requestId = context.getRequestId();
- MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(senderStageId);
- if (mailboxMetadata != null && !mailboxMetadata.getMailboxIds().isEmpty()) {
- _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
+ MailboxInfos mailboxInfos = context.getWorkerMetadata().getMailboxInfosMap().get(senderStageId);
+ if (mailboxInfos != null) {
+ _mailboxIds =
+ MailboxIdUtils.toMailboxIds(requestId, senderStageId, mailboxInfos.getMailboxInfos(), context.getStageId(),
+ context.getWorkerId());
} else {
_mailboxIds = Collections.emptyList();
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index f74feb1cac..5d66d6afac 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -20,11 +20,11 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
@@ -32,8 +32,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.RoutingInfo;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -91,18 +91,14 @@ public class MailboxSendOperator extends MultiStageOperator {
long requestId = context.getRequestId();
long deadlineMs = context.getDeadlineMs();
- MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(receiverStageId);
- List<String> sendingMailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
- List<VirtualServerAddress> sendingAddresses = mailboxMetadata.getVirtualAddresses();
- int numMailboxes = sendingMailboxIds.size();
- List<SendingMailbox> sendingMailboxes = new ArrayList<>(numMailboxes);
- for (int i = 0; i < numMailboxes; i++) {
- String sendingMailboxId = sendingMailboxIds.get(i);
- VirtualServerAddress sendingAddress = sendingAddresses.get(i);
- sendingMailboxes.add(
- mailboxService.getSendingMailbox(sendingAddress.hostname(), sendingAddress.port(), sendingMailboxId,
- deadlineMs));
- }
+ List<MailboxInfo> mailboxInfos =
+ context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+ List<RoutingInfo> routingInfos =
+ MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId,
+ mailboxInfos);
+ List<SendingMailbox> sendingMailboxes = routingInfos.stream()
+ .map(v -> mailboxService.getSendingMailbox(v.getHostname(), v.getPort(), v.getMailboxId(), deadlineMs))
+ .collect(Collectors.toList());
return BlockExchange.getExchange(sendingMailboxes, distributionType, distributionKeys,
TransferableBlockUtils::splitBlock);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 5059b2f8ec..51d61e5dbc 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.plan;
import java.util.Collections;
import java.util.Map;
import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.operator.OpChainId;
@@ -38,11 +39,11 @@ import org.apache.pinot.spi.utils.CommonConstants;
public class OpChainExecutionContext {
private final MailboxService _mailboxService;
private final long _requestId;
- private final int _stageId;
private final long _deadlineMs;
private final Map<String, String> _opChainMetadata;
private final StageMetadata _stageMetadata;
private final WorkerMetadata _workerMetadata;
+ private final VirtualServerAddress _server;
private final OpChainId _id;
private final OpChainStats _stats;
private final PipelineBreakerResult _pipelineBreakerResult;
@@ -50,17 +51,18 @@ public class OpChainExecutionContext {
private ServerPlanRequestContext _leafStageContext;
- public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs,
+ public OpChainExecutionContext(MailboxService mailboxService, long requestId, long deadlineMs,
Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata,
PipelineBreakerResult pipelineBreakerResult) {
_mailboxService = mailboxService;
_requestId = requestId;
- _stageId = stageId;
_deadlineMs = deadlineMs;
_opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
_stageMetadata = stageMetadata;
_workerMetadata = workerMetadata;
- _id = new OpChainId(requestId, workerMetadata.getVirtualAddress().workerId(), stageId);
+ _server =
+ new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), workerMetadata.getWorkerId());
+ _id = new OpChainId(requestId, workerMetadata.getWorkerId(), stageMetadata.getStageId());
_stats = new OpChainStats(_id.toString());
_pipelineBreakerResult = pipelineBreakerResult;
if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) {
@@ -78,11 +80,15 @@ public class OpChainExecutionContext {
}
public int getStageId() {
- return _stageId;
+ return _stageMetadata.getStageId();
+ }
+
+ public int getWorkerId() {
+ return _workerMetadata.getWorkerId();
}
public VirtualServerAddress getServer() {
- return _workerMetadata.getVirtualAddress();
+ return _server;
}
public long getDeadlineMs() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index aec7998e16..a033df03d7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -36,7 +37,6 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StagePlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,12 +77,12 @@ public class PipelineBreakerExecutor {
// OpChain receive-mail callbacks.
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
+ new OpChainExecutionContext(mailboxService, requestId, deadlineMs, opChainMetadata,
stagePlan.getStageMetadata(), workerMetadata, null);
return execute(scheduler, pipelineBreakerContext, opChainExecutionContext);
} catch (Exception e) {
LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId,
- stagePlan.getStageId(), e);
+ stagePlan.getStageMetadata().getStageId(), e);
return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), Collections.emptyMap(),
TransferableBlockUtils.getErrorTransferableBlock(e), null);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 3c03fa1539..cc9d131962 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -24,7 +24,7 @@ import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.runtime.plan.StagePlan;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 4c504f71d6..9b8d19d77d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -47,11 +47,11 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 06df28f561..f217c9b01a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -54,7 +54,9 @@ import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -63,8 +65,6 @@ import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
@@ -170,11 +170,11 @@ public class QueryDispatcher {
QueryPlanSerDeUtils.toProtoWorkerMetadataList(workerMetadataList);
StageInfo stageInfo = stageInfos.get(i);
Worker.StageMetadata stageMetadata =
- Worker.StageMetadata.newBuilder().addAllWorkerMetadata(protoWorkerMetadataList)
+ Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(protoWorkerMetadataList)
.setCustomProperty(stageInfo._customProperty).build();
requestBuilder.addStagePlan(
- Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(stageInfo._rootNode)
- .setStageMetadata(stageMetadata).build());
+ Worker.StagePlan.newBuilder().setRootNode(stageInfo._rootNode).setStageMetadata(stageMetadata)
+ .build());
}
}
requestBuilder.setMetadata(protoRequestMetadata);
@@ -264,10 +264,10 @@ public class QueryDispatcher {
List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s",
workerMetadataList.size());
- StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
+ StageMetadata stageMetadata = new StageMetadata(0, workerMetadataList, dispatchableStagePlan.getCustomProperties());
OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(mailboxService, requestId, planFragment.getFragmentId(),
- System.currentTimeMillis() + timeoutMs, queryOptions, stageMetadata, workerMetadataList.get(0), null);
+ new OpChainExecutionContext(mailboxService, requestId, System.currentTimeMillis() + timeoutMs, queryOptions,
+ stageMetadata, workerMetadataList.get(0), null);
MailboxReceiveOperator receiveOperator =
new MailboxReceiveOperator(opChainExecutionContext, receiveNode.getDistributionType(),
receiveNode.getSenderStageId());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 2e52c28a5a..c8caed9100 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -31,11 +31,11 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
-import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
@@ -124,7 +124,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while deserializing stage plan for request: %d, stage: %d", requestId,
- protoStagePlan.getStageId()), e);
+ protoStagePlan.getStageMetadata().getStageId()), e);
}
StageMetadata stageMetadata = stagePlan.getStageMetadata();
List<WorkerMetadata> workerMetadataList = stageMetadata.getWorkerMetadataList();
@@ -142,7 +142,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while submitting request: %d, stage: %d", requestId,
- protoStagePlan.getStageId()), e);
+ stageMetadata.getStageId()), e);
} finally {
for (CompletableFuture<?> future : workerSubmissionStubs) {
if (!future.isDone()) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 1811218eda..3de9cc6fdb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -27,9 +27,9 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.data.Schema;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index c34c858e76..7a35202529 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -25,13 +25,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.utils.NamedThreadFactory;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -41,6 +41,8 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class OpChainSchedulerServiceTest {
@@ -70,10 +72,13 @@ public class OpChainSchedulerServiceTest {
}
private OpChain getChain(MultiStageOperator operator) {
- WorkerMetadata workerMetadata =
- new WorkerMetadata(new VirtualServerAddress("localhost", 123, 0), ImmutableMap.of(), ImmutableMap.of());
- OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, Long.MAX_VALUE, ImmutableMap.of(),
- new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
+ MailboxService mailboxService = mock(MailboxService.class);
+ when(mailboxService.getHostname()).thenReturn("localhost");
+ when(mailboxService.getPort()).thenReturn(1234);
+ WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of());
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, ImmutableMap.of(),
+ new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
return new OpChain(context, operator);
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 1a2949b142..2868f6526c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -29,13 +29,14 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
@@ -70,25 +71,21 @@ public class MailboxReceiveOperatorTest {
@BeforeClass
public void setUp() {
- VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
- VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
- new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
- _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
- ImmutableMap.of())), ImmutableMap.of());
+ MailboxInfos mailboxInfosBoth = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0, 1)));
+ _stageMetadataBoth = new StageMetadata(0, Stream.of(0, 1)
+ .map(workerId -> new WorkerMetadata(workerId, ImmutableMap.of(1, mailboxInfosBoth), ImmutableMap.of()))
+ .collect(Collectors.toList()), ImmutableMap.of());
+ MailboxInfos mailboxInfos1 = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0)));
+ _stageMetadata1 = new StageMetadata(0,
+ ImmutableList.of(new WorkerMetadata(0, ImmutableMap.of(1, mailboxInfos1), ImmutableMap.of())),
+ ImmutableMap.of());
}
@BeforeMethod
public void setUpMethod() {
_mocks = MockitoAnnotations.openMocks(this);
when(_mailboxService.getHostname()).thenReturn("localhost");
- when(_mailboxService.getPort()).thenReturn(123);
+ when(_mailboxService.getPort()).thenReturn(1234);
}
@AfterMethod
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 86b2ac0000..92f7b2ce7a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -25,13 +25,12 @@ import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.testng.annotations.AfterMethod;
@@ -52,20 +51,17 @@ public class MailboxSendOperatorTest {
private AutoCloseable _mocks;
@Mock
- private VirtualServerAddress _server;
+ private MailboxService _mailboxService;
@Mock
private MultiStageOperator _sourceOperator;
@Mock
- private MailboxService _mailboxService;
- @Mock
private BlockExchange _exchange;
@BeforeMethod
public void setUpMethod() {
_mocks = openMocks(this);
- when(_server.hostname()).thenReturn("mock");
- when(_server.port()).thenReturn(0);
- when(_server.workerId()).thenReturn(0);
+ when(_mailboxService.getHostname()).thenReturn("localhost");
+ when(_mailboxService.getPort()).thenReturn(1234);
}
@AfterMethod
@@ -198,11 +194,12 @@ public class MailboxSendOperatorTest {
}
private MailboxSendOperator getMailboxSendOperator() {
- WorkerMetadata workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(), ImmutableMap.of());
- StageMetadata stageMetadata = new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of());
+ WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of());
+ StageMetadata stageMetadata =
+ new StageMetadata(SENDER_STAGE_ID, ImmutableList.of(workerMetadata), ImmutableMap.of());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, Long.MAX_VALUE, ImmutableMap.of(),
- stageMetadata, workerMetadata, null);
+ new OpChainExecutionContext(_mailboxService, 123L, Long.MAX_VALUE, ImmutableMap.of(), stageMetadata,
+ workerMetadata, null);
return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index a74dec4e6f..f4cd38a42e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -44,15 +44,15 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.spi.utils.CommonConstants;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -76,15 +76,12 @@ public class OpChainTest {
private final List<TransferableBlock> _blockList = new ArrayList<>();
private final ExecutorService _executor = Executors.newCachedThreadPool();
private final AtomicReference<LeafStageTransferableBlockOperator> _leafOpRef = new AtomicReference<>();
- private final VirtualServerAddress _serverAddress = new VirtualServerAddress("localhost", 123, 0);
- private final WorkerMetadata _workerMetadata = new WorkerMetadata(_serverAddress, ImmutableMap.of(0,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
- ImmutableList.of(_serverAddress)), 1,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
- ImmutableList.of(_serverAddress)), 2,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
- ImmutableList.of(_serverAddress))), ImmutableMap.of());
- private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
+ private final MailboxInfos _mailboxInfos =
+ new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0)));
+ private final WorkerMetadata _workerMetadata =
+ new WorkerMetadata(0, ImmutableMap.of(0, _mailboxInfos, 1, _mailboxInfos, 2, _mailboxInfos), ImmutableMap.of());
+ private final StageMetadata _stageMetadata =
+ new StageMetadata(0, ImmutableList.of(_workerMetadata), ImmutableMap.of());
private AutoCloseable _mocks;
@Mock
@@ -199,13 +196,9 @@ public class OpChainTest {
public void testStatsCollectionTracingEnabledMultipleOperators() {
long dummyOperatorWaitTime = 1000L;
- int receivedStageId = 2;
- int senderStageId = 1;
- OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE,
+ OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 123L, Long.MAX_VALUE,
ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
-
- Stack<MultiStageOperator> operators =
- getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+ Stack<MultiStageOperator> operators = getFullOpChain(context, dummyOperatorWaitTime);
OpChain opChain = new OpChain(context, operators.peek());
opChain.getStats().executing();
@@ -214,12 +207,10 @@ public class OpChainTest {
}
opChain.getStats().queued();
- OpChainExecutionContext secondStageContext =
- new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE,
- ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
-
+ OpChainExecutionContext secondStageContext = new OpChainExecutionContext(_mailboxService2, 123L, Long.MAX_VALUE,
+ ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
MailboxReceiveOperator secondStageReceiveOp =
- new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
+ new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1);
assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
int numOperators = operators.size();
@@ -238,14 +229,10 @@ public class OpChainTest {
public void testStatsCollectionTracingDisableMultipleOperators() {
long dummyOperatorWaitTime = 1000L;
- int receivedStageId = 2;
- int senderStageId = 1;
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE, ImmutableMap.of(),
- _stageMetadata, _workerMetadata, null);
-
- Stack<MultiStageOperator> operators =
- getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+ new OpChainExecutionContext(_mailboxService1, 123L, Long.MAX_VALUE, ImmutableMap.of(), _stageMetadata,
+ _workerMetadata, null);
+ Stack<MultiStageOperator> operators = getFullOpChain(context, dummyOperatorWaitTime);
OpChain opChain = new OpChain(context, operators.peek());
opChain.getStats().executing();
@@ -253,10 +240,10 @@ public class OpChainTest {
opChain.getStats().queued();
OpChainExecutionContext secondStageContext =
- new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE, ImmutableMap.of(),
- _stageMetadata, _workerMetadata, null);
+ new OpChainExecutionContext(_mailboxService2, 123L, Long.MAX_VALUE, ImmutableMap.of(), _stageMetadata,
+ _workerMetadata, null);
MailboxReceiveOperator secondStageReceiveOp =
- new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
+ new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, 1);
assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
assertEquals(opChain.getStats().getOperatorStatsMap().size(), 2);
@@ -275,8 +262,7 @@ public class OpChainTest {
assertEquals(secondStageContext.getStats().getOperatorStatsMap().size(), 2);
}
- private Stack<MultiStageOperator> getFullOpchain(int receivedStageId, int senderStageId,
- OpChainExecutionContext context, long waitTimeInMillis) {
+ private Stack<MultiStageOperator> getFullOpChain(OpChainExecutionContext context, long waitTimeInMillis) {
Stack<MultiStageOperator> operators = new Stack<>();
DataSchema upStreamSchema = new DataSchema(new String[]{"intCol"}, new ColumnDataType[]{ColumnDataType.INT});
//Mailbox Receive Operator
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 3c132269c7..5280724541 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -26,15 +26,18 @@ import java.util.Map;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
import org.apache.pinot.spi.utils.CommonConstants;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class OperatorTestUtil {
// simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
@@ -78,7 +81,7 @@ public class OperatorTestUtil {
public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, long deadlineMs,
StageMetadata stageMetadata) {
- return new OpChainExecutionContext(mailboxService, 0, 0, deadlineMs, ImmutableMap.of(), stageMetadata,
+ return new OpChainExecutionContext(mailboxService, 0, deadlineMs, ImmutableMap.of(), stageMetadata,
stageMetadata.getWorkerMetadataList().get(0), null);
}
@@ -91,9 +94,11 @@ public class OperatorTestUtil {
}
private static OpChainExecutionContext getDefaultContext(Map<String, String> opChainMetadata) {
- WorkerMetadata workerMetadata =
- new WorkerMetadata(new VirtualServerAddress("mock", 80, 0), ImmutableMap.of(), ImmutableMap.of());
- return new OpChainExecutionContext(null, 1, 2, Long.MAX_VALUE, opChainMetadata,
- new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
+ MailboxService mailboxService = mock(MailboxService.class);
+ when(mailboxService.getHostname()).thenReturn("localhost");
+ when(mailboxService.getPort()).thenReturn(1234);
+ WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(), ImmutableMap.of());
+ return new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE, opChainMetadata,
+ new StageMetadata(0, ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 1e71018215..314081588f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -34,13 +34,14 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
-import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
@@ -78,25 +79,21 @@ public class SortedMailboxReceiveOperatorTest {
@BeforeClass
public void setUp() {
- VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
- VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
- new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
- _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
- ImmutableMap.of())), ImmutableMap.of());
+ MailboxInfos mailboxInfosBoth = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0, 1)));
+ _stageMetadataBoth = new StageMetadata(0, Stream.of(0, 1)
+ .map(workerId -> new WorkerMetadata(workerId, ImmutableMap.of(1, mailboxInfosBoth), ImmutableMap.of()))
+ .collect(Collectors.toList()), ImmutableMap.of());
+ MailboxInfos mailboxInfos1 = new SharedMailboxInfos(new MailboxInfo("localhost", 1234, ImmutableList.of(0)));
+ _stageMetadata1 = new StageMetadata(0,
+ ImmutableList.of(new WorkerMetadata(0, ImmutableMap.of(1, mailboxInfos1), ImmutableMap.of())),
+ ImmutableMap.of());
}
@BeforeMethod
public void setUpMethod() {
_mocks = MockitoAnnotations.openMocks(this);
when(_mailboxService.getHostname()).thenReturn("localhost");
- when(_mailboxService.getPort()).thenReturn(123);
+ when(_mailboxService.getPort()).thenReturn(1234);
}
@AfterMethod
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 94d5e2b873..58dcd2106e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -34,7 +34,11 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.MailboxInfo;
+import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -42,8 +46,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -65,13 +67,12 @@ public class PipelineBreakerExecutorTest {
private final VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
private final ExecutorService _executor = Executors.newCachedThreadPool();
private final OpChainSchedulerService _scheduler = new OpChainSchedulerService(_executor);
- private final WorkerMetadata _workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(0, new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
- ImmutableList.of(_server, _server)), 1,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(_server)), 2,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(_server))),
- ImmutableMap.of());
- private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
+ private final MailboxInfos _mailboxInfos =
+ new SharedMailboxInfos(new MailboxInfo("localhost", 123, ImmutableList.of(0)));
+ private final WorkerMetadata _workerMetadata =
+ new WorkerMetadata(0, ImmutableMap.of(1, _mailboxInfos, 2, _mailboxInfos), ImmutableMap.of());
+ private final StageMetadata _stageMetadata =
+ new StageMetadata(0, ImmutableList.of(_workerMetadata), ImmutableMap.of());
private AutoCloseable _mocks;
@Mock
@@ -107,7 +108,7 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode mailboxReceiveNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
+ StagePlan stagePlan = new StagePlan(mailboxReceiveNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -145,7 +146,7 @@ public class PipelineBreakerExecutorTest {
new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(mailboxReceiveNode2);
- StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
+ StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -181,7 +182,7 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode incorrectlyConfiguredMailboxNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- StagePlan stagePlan = new StagePlan(0, incorrectlyConfiguredMailboxNode, _stageMetadata);
+ StagePlan stagePlan = new StagePlan(incorrectlyConfiguredMailboxNode, _stageMetadata);
// when
PipelineBreakerResult pipelineBreakerResult =
@@ -204,7 +205,7 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode mailboxReceiveNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
+ StagePlan stagePlan = new StagePlan(mailboxReceiveNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -240,7 +241,7 @@ public class PipelineBreakerExecutorTest {
new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
- StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
+ StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -278,7 +279,7 @@ public class PipelineBreakerExecutorTest {
new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
- StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
+ StagePlan stagePlan = new StagePlan(joinNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index f1315aa1bf..298af4deaf 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -58,9 +58,9 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -162,9 +162,9 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey());
List<WorkerMetadata> workerMetadataList =
entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList());
- StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
- StagePlan stagePlan =
- new StagePlan(stageId, dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata);
+ StageMetadata stageMetadata =
+ new StageMetadata(stageId, workerMetadataList, dispatchableStagePlan.getCustomProperties());
+ StagePlan stagePlan = new StagePlan(dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata);
for (WorkerMetadata workerMetadata : workerMetadataList) {
submissionStubs.add(serverEnclosure.processQuery(workerMetadata, stagePlan, requestMetadataMap));
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 679f46c60c..2595d48617 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -42,11 +42,11 @@ import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
+import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.EqualityUtils;
@@ -135,7 +135,8 @@ public class QueryServerTest extends QueryTestSet {
DispatchablePlanFragment dispatchableStagePlan = stagePlans.get(stageId);
List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
- StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
+ StageMetadata stageMetadata =
+ new StageMetadata(stageId, workerMetadataList, dispatchableStagePlan.getCustomProperties());
// ensure mock query runner received correctly deserialized payload.
QueryRunner mockRunner = _queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
@@ -190,8 +191,8 @@ public class QueryServerTest extends QueryTestSet {
}
private static boolean isWorkerMetadataEqual(WorkerMetadata expected, WorkerMetadata actual) {
- return expected.getVirtualAddress().equals(actual.getVirtualAddress()) && EqualityUtils.isEqual(
- expected.getTableSegmentsMap(), actual.getTableSegmentsMap());
+ return expected.getWorkerId() == actual.getWorkerId() && EqualityUtils.isEqual(expected.getTableSegmentsMap(),
+ actual.getTableSegmentsMap());
}
private static boolean isStageNodesEqual(PlanNode left, PlanNode right) {
@@ -235,11 +236,10 @@ public class QueryServerTest extends QueryTestSet {
// as it is not testing the multi-tenancy dispatch (which is in the QueryDispatcherTest)
QueryServerInstance serverInstance = stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next();
Worker.StageMetadata stageMetadata =
- Worker.StageMetadata.newBuilder().addAllWorkerMetadata(workerMetadataList).setCustomProperty(customProperty)
- .build();
+ Worker.StageMetadata.newBuilder().setStageId(stageId).addAllWorkerMetadata(workerMetadataList)
+ .setCustomProperty(customProperty).build();
Worker.StagePlan protoStagePlan =
- Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(rootNode.toByteString())
- .setStageMetadata(stageMetadata).build();
+ Worker.StagePlan.newBuilder().setRootNode(rootNode.toByteString()).setStageMetadata(stageMetadata).build();
Map<String, String> requestMetadata = new HashMap<>();
// the default configurations that must exist.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org