You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/07 19:16:55 UTC
(pinot) branch master updated: [Multi-stage] Optimize query plan serialization (#12370)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8434158758 [Multi-stage] Optimize query plan serialization (#12370)
8434158758 is described below
commit 8434158758e1824d3c38de5912925516b7bc5c8d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 7 11:16:49 2024 -0800
[Multi-stage] Optimize query plan serialization (#12370)
---
pinot-common/src/main/proto/mailbox.proto | 18 --
pinot-common/src/main/proto/plan.proto | 2 +-
pinot-common/src/main/proto/server.proto | 18 --
pinot-common/src/main/proto/worker.proto | 33 +---
.../explain/PhysicalExplainPlanVisitor.java | 6 +-
.../planner/physical/DispatchablePlanContext.java | 9 +-
.../planner/physical/MailboxAssignmentVisitor.java | 47 +++--
.../query/planner/physical/MailboxIdUtils.java | 26 ++-
.../pinot/query/routing/MailboxMetadata.java | 63 ++-----
.../apache/pinot/query/routing/WorkerMetadata.java | 97 +++++------
.../apache/pinot/query/mailbox/MailboxIdUtils.java | 50 ------
.../apache/pinot/query/runtime/QueryRunner.java | 58 ++++---
.../operator/BaseMailboxReceiveOperator.java | 8 +-
.../runtime/operator/MailboxSendOperator.java | 20 ++-
.../runtime/plan/OpChainExecutionContext.java | 17 +-
.../pinot/query/runtime/plan/StageMetadata.java | 58 +------
.../{DistributedStagePlan.java => StagePlan.java} | 36 +---
.../plan/pipeline/PipelineBreakerExecutor.java | 19 ++-
.../runtime/plan/serde/QueryPlanSerDeUtils.java | 154 +++++------------
.../plan/server/ServerPlanRequestContext.java | 10 +-
.../plan/server/ServerPlanRequestUtils.java | 65 +++----
.../query/service/dispatch/QueryDispatcher.java | 76 ++++++---
.../pinot/query/service/server/QueryServer.java | 42 +++--
.../apache/pinot/query/QueryServerEnclosure.java | 7 +-
.../pinot/query/mailbox/MailboxServiceTest.java | 13 +-
.../executor/OpChainSchedulerServiceTest.java | 12 +-
.../operator/MailboxReceiveOperatorTest.java | 70 ++++----
.../runtime/operator/MailboxSendOperatorTest.java | 17 +-
.../pinot/query/runtime/operator/OpChainTest.java | 56 +++---
.../query/runtime/operator/OperatorTestUtil.java | 30 ++--
.../operator/SortedMailboxReceiveOperatorTest.java | 71 ++++----
.../plan/pipeline/PipelineBreakerExecutorTest.java | 100 +++++------
.../plan/serde/QueryPlanSerDeUtilsTest.java | 58 -------
.../query/runtime/queries/QueryRunnerTestBase.java | 33 ++--
.../query/service/server/QueryServerTest.java | 189 ++++++++++-----------
35 files changed, 626 insertions(+), 962 deletions(-)
diff --git a/pinot-common/src/main/proto/mailbox.proto b/pinot-common/src/main/proto/mailbox.proto
index 6e1cca9a92..2ffe923c4d 100644
--- a/pinot-common/src/main/proto/mailbox.proto
+++ b/pinot-common/src/main/proto/mailbox.proto
@@ -17,24 +17,6 @@
// under the License.
//
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
syntax = "proto3";
package org.apache.pinot.common.proto;
diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto
index b36fa652b0..144a6fd7cc 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -78,4 +78,4 @@ message ListField {
// The key of the map is a string and the value of the map is a MemberVariableField.
message MapField {
map<string, MemberVariableField> content = 1;
-}
\ No newline at end of file
+}
diff --git a/pinot-common/src/main/proto/server.proto b/pinot-common/src/main/proto/server.proto
index 0239ae125f..7781d6f96e 100644
--- a/pinot-common/src/main/proto/server.proto
+++ b/pinot-common/src/main/proto/server.proto
@@ -17,24 +17,6 @@
// under the License.
//
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
syntax = "proto3";
package org.apache.pinot.common.proto;
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index dfb1cd53eb..b7e492fcc5 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -17,30 +17,10 @@
// under the License.
//
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
syntax = "proto3";
package org.apache.pinot.common.proto;
-import "plan.proto";
-
service PinotQueryWorker {
// Dispatch a QueryRequest to a PinotQueryWorker
rpc Submit(QueryRequest) returns (QueryResponse);
@@ -59,7 +39,7 @@ message CancelResponse {
// QueryRequest is the dispatched content for all query stages to a physical worker.
message QueryRequest {
repeated StagePlan stagePlan = 1;
- map<string, string> metadata = 2;
+ bytes metadata = 2; // Serialized Properties
}
// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
@@ -70,15 +50,13 @@ message QueryResponse {
message StagePlan {
int32 stageId = 1;
- StageNode stageRoot = 2;
+ bytes rootNode = 2; // Serialized StageNode
StageMetadata stageMetadata = 3;
}
message StageMetadata {
repeated WorkerMetadata workerMetadata = 1;
- map<string, string> customProperty = 2;
- string serverAddress = 3;
- repeated int32 workerIds = 4;
+ bytes customProperty = 2; // Serialized Properties
}
message WorkerMetadata {
@@ -90,5 +68,8 @@ message WorkerMetadata {
message MailboxMetadata {
repeated string mailboxId = 1;
repeated string virtualAddress = 2;
- map<string, string> customProperty = 3;
+}
+
+message Properties {
+ map<string, string> property = 1;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
index e1a6ac1176..ea9bef1139 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java
@@ -61,7 +61,6 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
/**
* Explains the query plan.
*
- * @see DispatchableSubPlan#explain()
* @param dispatchableSubPlan the queryPlan to explain
* @return a String representation of the query plan tree
*/
@@ -216,9 +215,8 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
int receiverStageId = node.getReceiverStageId();
List<VirtualServerAddress> serverAddressList =
- _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId())
- .getWorkerMetadataList().get(context._workerId)
- .getMailBoxInfosMap().get(receiverStageId).getVirtualAddressList();
+ _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId()).getWorkerMetadataList()
+ .get(context._workerId).getMailboxMetadataMap().get(receiverStageId).getVirtualAddresses();
List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
context._builder.append("->");
String receivers = serverInstanceToWorkerIdList.stream()
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index f17f48fd2f..22744dda0e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -108,13 +108,12 @@ public class DispatchablePlanContext {
int workerId = serverEntry.getKey();
QueryServerInstance queryServerInstance = serverEntry.getValue();
serverInstanceToWorkerIdsMap.computeIfAbsent(queryServerInstance, k -> new ArrayList<>()).add(workerId);
- WorkerMetadata.Builder workerMetadataBuilder = new WorkerMetadata.Builder().setVirtualServerAddress(
- new VirtualServerAddress(queryServerInstance, workerId));
+ WorkerMetadata workerMetadata = new WorkerMetadata(new VirtualServerAddress(queryServerInstance, workerId),
+ workerIdToMailboxesMap.get(workerId));
if (workerIdToSegmentsMap != null) {
- workerMetadataBuilder.addTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
+ workerMetadata.setTableSegmentsMap(workerIdToSegmentsMap.get(workerId));
}
- workerMetadataBuilder.putAllMailBoxInfosMap(workerIdToMailboxesMap.get(workerId));
- workerMetadataArray[workerId] = workerMetadataBuilder.build();
+ workerMetadataArray[workerId] = workerMetadata;
}
// set the stageMetadata
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 736b443c53..421e7bbc9c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -19,8 +19,10 @@
package org.apache.pinot.query.planner.physical;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
@@ -63,7 +65,7 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
workerId, senderServer, receiverServer);
MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList(
MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)),
- Collections.singletonList(new VirtualServerAddress(senderServer, workerId)), Collections.emptyMap());
+ Collections.singletonList(new VirtualServerAddress(senderServer, workerId)));
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
@@ -78,11 +80,9 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
for (int workerId = 0; workerId < numSenders; workerId++) {
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId);
MailboxMetadata serderMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)),
- Collections.emptyMap());
+ Collections.singletonList(new VirtualServerAddress(receiverServerMap.get(workerId), workerId)));
MailboxMetadata receiverMailboxMetadata = new MailboxMetadata(Collections.singletonList(mailboxId),
- Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)),
- Collections.emptyMap());
+ Collections.singletonList(new VirtualServerAddress(senderServerMap.get(workerId), workerId)));
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
.put(receiverFragmentId, serderMailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
@@ -94,22 +94,23 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
- MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+ List<String> receivingMailboxIds = new ArrayList<>(partitionParallelism);
+ List<VirtualServerAddress> receivingAddresses = new ArrayList<>(partitionParallelism);
+ MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
.put(receiverFragmentId, senderMailboxMetadata);
for (int i = 0; i < partitionParallelism; i++) {
- VirtualServerAddress receiverAddress =
- new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
receiverWorkerId);
- senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
- senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+ receivingMailboxIds.add(mailboxId);
+ receivingAddresses.add(
+ new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
MailboxMetadata receiverMailboxMetadata =
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
- receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
- receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
+ receiverMailboxMetadata.getMailboxIds().add(mailboxId);
+ receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
receiverWorkerId++;
}
@@ -123,22 +124,22 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) {
VirtualServerAddress senderAddress =
new VirtualServerAddress(senderServerMap.get(senderWorkerId), senderWorkerId);
- MailboxMetadata senderMailboxMetadata = new MailboxMetadata();
+ List<String> receivingMailboxIds = new ArrayList<>(numReceivers);
+ List<VirtualServerAddress> receivingAddresses = new ArrayList<>(numReceivers);
+ MailboxMetadata senderMailboxMetadata = new MailboxMetadata(receivingMailboxIds, receivingAddresses);
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>())
.put(receiverFragmentId, senderMailboxMetadata);
for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) {
- VirtualServerAddress receiverAddress =
- new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId);
String mailboxId =
MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId, receiverWorkerId);
- senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
- senderMailboxMetadata.getVirtualAddressList().add(receiverAddress);
+ receivingMailboxIds.add(mailboxId);
+ receivingAddresses.add(new VirtualServerAddress(receiverServerMap.get(receiverWorkerId), receiverWorkerId));
MailboxMetadata receiverMailboxMetadata =
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>())
.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
- receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
- receiverMailboxMetadata.getVirtualAddressList().add(senderAddress);
+ receiverMailboxMetadata.getMailboxIds().add(mailboxId);
+ receiverMailboxMetadata.getVirtualAddresses().add(senderAddress);
}
}
}
@@ -154,14 +155,12 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
int numReceivers = receiverServerMap.size();
if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
// leaf-to-intermediate condition
- return numSenders * sender.getPartitionParallelism() == numReceivers
- && sender.getPartitionFunction() != null
+ return numSenders * sender.getPartitionParallelism() == numReceivers && sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
} else {
// dynamic-broadcast condition || intermediate-to-intermediate
- return numSenders == numReceivers
- && sender.getPartitionFunction() != null
- && sender.getPartitionFunction().equalsIgnoreCase(receiver.getPartitionFunction());
+ return numSenders == numReceivers && sender.getPartitionFunction() != null && sender.getPartitionFunction()
+ .equalsIgnoreCase(receiver.getPartitionFunction());
}
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
index c949c6598f..32c7d3197a 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxIdUtils.java
@@ -18,15 +18,35 @@
*/
package org.apache.pinot.query.planner.physical;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.query.routing.MailboxMetadata;
+
+
public class MailboxIdUtils {
private MailboxIdUtils() {
}
- private static final char SEPARATOR = '|';
+ public static final char SEPARATOR = '|';
public static String toPlanMailboxId(int senderStageId, int senderWorkerId, int receiverStageId,
int receiverWorkerId) {
- return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR
- + receiverStageId + SEPARATOR + receiverWorkerId;
+ return Integer.toString(senderStageId) + SEPARATOR + senderWorkerId + SEPARATOR + receiverStageId + SEPARATOR
+ + receiverWorkerId;
+ }
+
+ public static String toMailboxId(long requestId, String planMailboxId) {
+ return Long.toString(requestId) + SEPARATOR + planMailboxId;
+ }
+
+ public static List<String> toMailboxIds(long requestId, MailboxMetadata mailboxMetadata) {
+ return mailboxMetadata.getMailboxIds().stream().map(v -> toMailboxId(requestId, v)).collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
+ int receiverWorkerId) {
+ return toMailboxId(requestId, toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
index dcc46c8271..b3484d1a7b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/MailboxMetadata.java
@@ -19,10 +19,7 @@
package org.apache.pinot.query.routing;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
/**
@@ -31,68 +28,32 @@ import java.util.Objects;
* <ul>
* <li>MailboxId: the unique id of the mailbox</li>
* <li>VirtualAddress: the virtual address of the mailbox</li>
- * <li>CustomProperties: the custom properties of the mailbox</li>
* </ul>
*/
public class MailboxMetadata {
- private final List<String> _mailBoxIdList;
- private final List<VirtualServerAddress> _virtualAddressList;
- private final Map<String, String> _customProperties;
+ private final List<String> _mailboxIds;
+ private final List<VirtualServerAddress> _virtualAddresses;
public MailboxMetadata() {
- _mailBoxIdList = new ArrayList<>();
- _virtualAddressList = new ArrayList<>();
- _customProperties = new HashMap<>();
+ _mailboxIds = new ArrayList<>();
+ _virtualAddresses = new ArrayList<>();
}
- public MailboxMetadata(List<String> mailBoxIdList, List<VirtualServerAddress> virtualAddressList,
- Map<String, String> customProperties) {
- _mailBoxIdList = mailBoxIdList;
- _virtualAddressList = virtualAddressList;
- _customProperties = customProperties;
+ public MailboxMetadata(List<String> mailboxIds, List<VirtualServerAddress> virtualAddresses) {
+ _mailboxIds = mailboxIds;
+ _virtualAddresses = virtualAddresses;
}
- public List<String> getMailBoxIdList() {
- return _mailBoxIdList;
+ public List<String> getMailboxIds() {
+ return _mailboxIds;
}
- public String getMailBoxId(int index) {
- return _mailBoxIdList.get(index);
- }
-
- public List<VirtualServerAddress> getVirtualAddressList() {
- return _virtualAddressList;
- }
-
- public VirtualServerAddress getVirtualAddress(int index) {
- return _virtualAddressList.get(index);
- }
-
- public Map<String, String> getCustomProperties() {
- return _customProperties;
+ public List<VirtualServerAddress> getVirtualAddresses() {
+ return _virtualAddresses;
}
@Override
public String toString() {
- return _mailBoxIdList + "@" + _virtualAddressList.toString() + "#" + _customProperties.toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(_mailBoxIdList, _virtualAddressList, _customProperties);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- MailboxMetadata that = (MailboxMetadata) o;
- return Objects.equals(_mailBoxIdList, that._mailBoxIdList)
- && Objects.equals(_virtualAddressList, that._virtualAddressList)
- && _customProperties.equals(that._customProperties);
+ return _mailboxIds + "@" + _virtualAddresses;
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index 9d92bfb697..3392261980 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -41,85 +42,63 @@ import org.apache.pinot.spi.utils.JsonUtils;
* MailboxSendNode and MailboxReceiveNode to derive the info during runtime. this should changed to plan time soon.
*/
public class WorkerMetadata {
- private final VirtualServerAddress _virtualServerAddress;
- private final Map<Integer, MailboxMetadata> _mailBoxInfosMap;
+ public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
+
+ private final VirtualServerAddress _virtualAddress;
+ private final Map<Integer, MailboxMetadata> _mailboxMetadataMap;
private final Map<String, String> _customProperties;
- private WorkerMetadata(VirtualServerAddress virtualServerAddress, Map<Integer, MailboxMetadata> mailBoxInfosMap,
+ public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap) {
+ _virtualAddress = virtualAddress;
+ _mailboxMetadataMap = mailboxMetadataMap;
+ _customProperties = new HashMap<>();
+ }
+
+ public WorkerMetadata(VirtualServerAddress virtualAddress, Map<Integer, MailboxMetadata> mailboxMetadataMap,
Map<String, String> customProperties) {
- _virtualServerAddress = virtualServerAddress;
- _mailBoxInfosMap = mailBoxInfosMap;
+ _virtualAddress = virtualAddress;
+ _mailboxMetadataMap = mailboxMetadataMap;
_customProperties = customProperties;
}
- public VirtualServerAddress getVirtualServerAddress() {
- return _virtualServerAddress;
+ public VirtualServerAddress getVirtualAddress() {
+ return _virtualAddress;
}
- public Map<Integer, MailboxMetadata> getMailBoxInfosMap() {
- return _mailBoxInfosMap;
+ public Map<Integer, MailboxMetadata> getMailboxMetadataMap() {
+ return _mailboxMetadataMap;
}
public Map<String, String> getCustomProperties() {
return _customProperties;
}
- public static class Builder {
- public static final String TABLE_SEGMENTS_MAP_KEY = "tableSegmentsMap";
- private VirtualServerAddress _virtualServerAddress;
- private Map<Integer, MailboxMetadata> _mailBoxInfosMap;
- private Map<String, String> _customProperties;
-
- public Builder() {
- _mailBoxInfosMap = new HashMap<>();
- _customProperties = new HashMap<>();
- }
-
- public Builder setVirtualServerAddress(VirtualServerAddress virtualServerAddress) {
- _virtualServerAddress = virtualServerAddress;
- return this;
- }
-
- public Builder putAllMailBoxInfosMap(Map<Integer, MailboxMetadata> mailBoxInfosMap) {
- _mailBoxInfosMap.putAll(mailBoxInfosMap);
- return this;
- }
-
- public Builder addMailBoxInfoMap(Integer planFragmentId, MailboxMetadata mailBoxMetadata) {
- _mailBoxInfosMap.put(planFragmentId, mailBoxMetadata);
- return this;
- }
-
- public Builder addTableSegmentsMap(Map<String, List<String>> tableSegmentsMap) {
- try {
- String tableSegmentsMapStr = JsonUtils.objectToString(tableSegmentsMap);
- _customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
- } catch (JsonProcessingException e) {
- throw new RuntimeException("Unable to serialize table segments map", e);
- }
- return this;
- }
-
- public WorkerMetadata build() {
- return new WorkerMetadata(_virtualServerAddress, _mailBoxInfosMap, _customProperties);
- }
-
- public void putAllCustomProperties(Map<String, String> customPropertyMap) {
- _customProperties.putAll(customPropertyMap);
- }
- }
-
- public static Map<String, List<String>> getTableSegmentsMap(WorkerMetadata workerMetadata) {
- String tableSegmentKeyStr = workerMetadata.getCustomProperties().get(Builder.TABLE_SEGMENTS_MAP_KEY);
- if (tableSegmentKeyStr != null) {
+ @Nullable
+ public Map<String, List<String>> getTableSegmentsMap() {
+ String tableSegmentsMapStr = _customProperties.get(TABLE_SEGMENTS_MAP_KEY);
+ if (tableSegmentsMapStr != null) {
try {
- return JsonUtils.stringToObject(tableSegmentKeyStr, new TypeReference<Map<String, List<String>>>() {
+ return JsonUtils.stringToObject(tableSegmentsMapStr, new TypeReference<Map<String, List<String>>>() {
});
} catch (IOException e) {
- throw new RuntimeException("Unable to deserialize table segments map", e);
+ throw new RuntimeException("Unable to deserialize table segments map: " + tableSegmentsMapStr, e);
}
} else {
return null;
}
}
+
+ public boolean isLeafStageWorker() {
+ return _customProperties.containsKey(TABLE_SEGMENTS_MAP_KEY);
+ }
+
+ public void setTableSegmentsMap(Map<String, List<String>> tableSegmentsMap) {
+ String tableSegmentsMapStr;
+ try {
+ tableSegmentsMapStr = JsonUtils.objectToString(tableSegmentsMap);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Unable to serialize table segments map: " + tableSegmentsMap, e);
+ }
+ _customProperties.put(TABLE_SEGMENTS_MAP_KEY, tableSegmentsMapStr);
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
deleted file mode 100644
index 7168afc486..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.mailbox;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.pinot.query.routing.MailboxMetadata;
-
-
-// TODO: De-couple mailbox id from query information
-public class MailboxIdUtils {
- private MailboxIdUtils() {
- }
-
- private static final char SEPARATOR = '|';
-
- @VisibleForTesting
- public static String toMailboxId(long requestId, int senderStageId, int senderWorkerId, int receiverStageId,
- int receiverWorkerId) {
- return Long.toString(requestId) + SEPARATOR + senderStageId + SEPARATOR + senderWorkerId + SEPARATOR
- + receiverStageId + SEPARATOR + receiverWorkerId;
- }
-
- public static List<String> toMailboxIds(long requestId, MailboxMetadata mailBoxMetadata) {
- return toMailboxIds(requestId, mailBoxMetadata.getMailBoxIdList());
- }
-
- public static List<String> toMailboxIds(long requestId, List<String> mailboxMetadataIdList) {
- return mailboxMetadataIdList.stream()
- .map(mailboxIdFromBroker -> Long.toString(requestId) + SEPARATOR + mailboxIdFromBroker)
- .collect(Collectors.toList());
- }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 526a489fb6..4796383a0b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -31,17 +31,20 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
@@ -54,7 +57,7 @@ import org.slf4j.LoggerFactory;
/**
- * {@link QueryRunner} accepts a {@link DistributedStagePlan} and runs it.
+ * {@link QueryRunner} accepts a {@link StagePlan} and runs it.
*/
public class QueryRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
@@ -139,42 +142,46 @@ public class QueryRunner {
}
/**
- * Execute a {@link DistributedStagePlan}.
+ * Execute a {@link StagePlan}.
*
* <p>This execution entry point should be asynchronously called by the request handler and caller should not wait
* for results/exceptions.</p>
*/
- public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadata) {
+ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> requestMetadata) {
long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
- Map<String, String> opChainMetadata = consolidateMetadata(
- distributedStagePlan.getStageMetadata().getCustomProperties(), requestMetadata);
long deadlineMs = System.currentTimeMillis() + timeoutMs;
+ StageMetadata stageMetadata = stagePlan.getStageMetadata();
+ Map<String, String> opChainMetadata = consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
+
// run pre-stage execution for all pipeline breakers
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, distributedStagePlan,
+ PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, workerMetadata, stagePlan,
opChainMetadata, requestId, deadlineMs);
// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId,
- distributedStagePlan.getStageId(), errorBlock.getExceptions());
- int receiverStageId = ((MailboxSendNode) distributedStagePlan.getStageRoot()).getReceiverStageId();
- MailboxMetadata mailboxMetadata = distributedStagePlan.getStageMetadata().getWorkerMetadataList()
- .get(distributedStagePlan.getServer().workerId()).getMailBoxInfosMap().get(receiverStageId);
+ stagePlan.getStageId(), errorBlock.getExceptions());
+ int receiverStageId = ((MailboxSendNode) stagePlan.getRootNode()).getReceiverStageId();
+ MailboxMetadata mailboxMetadata = workerMetadata.getMailboxMetadataMap().get(receiverStageId);
List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
- for (int i = 0; i < mailboxIds.size(); i++) {
+ List<VirtualServerAddress> virtualAddresses = mailboxMetadata.getVirtualAddresses();
+ int numMailboxes = mailboxIds.size();
+ for (int i = 0; i < numMailboxes; i++) {
+ String mailboxId = mailboxIds.get(i);
+ VirtualServerAddress virtualAddress = virtualAddresses.get(i);
try {
- _mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(),
- mailboxMetadata.getVirtualAddress(i).port(), mailboxIds.get(i), deadlineMs).send(errorBlock);
+ _mailboxService.getSendingMailbox(virtualAddress.hostname(), virtualAddress.port(), mailboxId, deadlineMs)
+ .send(errorBlock);
} catch (TimeoutException e) {
- LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxIds.get(i),
- requestId, distributedStagePlan.getStageId(), e);
+ LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", mailboxId, requestId,
+ stagePlan.getStageId(), e);
} catch (Exception e) {
- LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}",
- mailboxIds.get(i), requestId, distributedStagePlan.getStageId(), e);
+ LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", mailboxId,
+ requestId, stagePlan.getStageId(), e);
}
}
return;
@@ -182,15 +189,14 @@ public class QueryRunner {
// run OpChain
OpChainExecutionContext executionContext =
- new OpChainExecutionContext(_mailboxService, requestId, distributedStagePlan.getStageId(),
- distributedStagePlan.getServer(), deadlineMs, opChainMetadata, distributedStagePlan.getStageMetadata(),
- pipelineBreakerResult);
+ new OpChainExecutionContext(_mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
+ stageMetadata, workerMetadata, pipelineBreakerResult);
OpChain opChain;
- if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
- opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, distributedStagePlan, _helixManager,
- _serverMetrics, _leafQueryExecutor, _executorService);
+ if (workerMetadata.isLeafStageWorker()) {
+ opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _helixManager, _serverMetrics,
+ _leafQueryExecutor, _executorService);
} else {
- opChain = PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
+ opChain = PhysicalPlanVisitor.walkPlanNode(stagePlan.getRootNode(), executionContext);
}
_opChainScheduler.register(opChain);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 438cec8494..37903c2f72 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -24,9 +24,9 @@ import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.utils.AsyncStream;
@@ -58,10 +58,8 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
_exchangeType = exchangeType;
long requestId = context.getRequestId();
- int workerId = context.getServer().workerId();
- MailboxMetadata mailboxMetadata =
- context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(senderStageId);
- if (mailboxMetadata != null && !mailboxMetadata.getMailBoxIdList().isEmpty()) {
+ MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(senderStageId);
+ if (mailboxMetadata != null && !mailboxMetadata.getMailboxIds().isEmpty()) {
_mailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
} else {
_mailboxIds = Collections.emptyList();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index f6f25510df..f74feb1cac 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,11 +28,12 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -90,14 +91,17 @@ public class MailboxSendOperator extends MultiStageOperator {
long requestId = context.getRequestId();
long deadlineMs = context.getDeadlineMs();
- int workerId = context.getServer().workerId();
- MailboxMetadata mailboxMetadata =
- context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(receiverStageId);
+ MailboxMetadata mailboxMetadata = context.getWorkerMetadata().getMailboxMetadataMap().get(receiverStageId);
List<String> sendingMailboxIds = MailboxIdUtils.toMailboxIds(requestId, mailboxMetadata);
- List<SendingMailbox> sendingMailboxes = new ArrayList<>(sendingMailboxIds.size());
- for (int i = 0; i < sendingMailboxIds.size(); i++) {
- sendingMailboxes.add(mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(),
- mailboxMetadata.getVirtualAddress(i).port(), sendingMailboxIds.get(i), deadlineMs));
+ List<VirtualServerAddress> sendingAddresses = mailboxMetadata.getVirtualAddresses();
+ int numMailboxes = sendingMailboxIds.size();
+ List<SendingMailbox> sendingMailboxes = new ArrayList<>(numMailboxes);
+ for (int i = 0; i < numMailboxes; i++) {
+ String sendingMailboxId = sendingMailboxIds.get(i);
+ VirtualServerAddress sendingAddress = sendingAddresses.get(i);
+ sendingMailboxes.add(
+ mailboxService.getSendingMailbox(sendingAddress.hostname(), sendingAddress.port(), sendingMailboxId,
+ deadlineMs));
}
return BlockExchange.getExchange(sendingMailboxes, distributionType, distributionKeys,
TransferableBlockUtils::splitBlock);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 10069167d6..5059b2f8ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Map;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
@@ -38,10 +39,10 @@ public class OpChainExecutionContext {
private final MailboxService _mailboxService;
private final long _requestId;
private final int _stageId;
- private final VirtualServerAddress _server;
private final long _deadlineMs;
private final Map<String, String> _opChainMetadata;
private final StageMetadata _stageMetadata;
+ private final WorkerMetadata _workerMetadata;
private final OpChainId _id;
private final OpChainStats _stats;
private final PipelineBreakerResult _pipelineBreakerResult;
@@ -49,17 +50,17 @@ public class OpChainExecutionContext {
private ServerPlanRequestContext _leafStageContext;
- public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
- VirtualServerAddress server, long deadlineMs, Map<String, String> opChainMetadata, StageMetadata stageMetadata,
+ public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs,
+ Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata,
PipelineBreakerResult pipelineBreakerResult) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
- _server = server;
_deadlineMs = deadlineMs;
_opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
_stageMetadata = stageMetadata;
- _id = new OpChainId(requestId, server.workerId(), stageId);
+ _workerMetadata = workerMetadata;
+ _id = new OpChainId(requestId, workerMetadata.getVirtualAddress().workerId(), stageId);
_stats = new OpChainStats(_id.toString());
_pipelineBreakerResult = pipelineBreakerResult;
if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) {
@@ -81,7 +82,7 @@ public class OpChainExecutionContext {
}
public VirtualServerAddress getServer() {
- return _server;
+ return _workerMetadata.getVirtualAddress();
}
public long getDeadlineMs() {
@@ -96,6 +97,10 @@ public class OpChainExecutionContext {
return _stageMetadata;
}
+ public WorkerMetadata getWorkerMetadata() {
+ return _workerMetadata;
+ }
+
public OpChainId getId() {
return _id;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
index f2543a3363..a07a04a0b7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
@@ -18,11 +18,10 @@
*/
package org.apache.pinot.query.runtime.plan;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -33,9 +32,9 @@ public class StageMetadata {
private final List<WorkerMetadata> _workerMetadataList;
private final Map<String, String> _customProperties;
- StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+ public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
_workerMetadataList = workerMetadataList;
- _customProperties = Collections.unmodifiableMap(customProperties);
+ _customProperties = customProperties;
}
public List<WorkerMetadata> getWorkerMetadataList() {
@@ -46,54 +45,13 @@ public class StageMetadata {
return _customProperties;
}
- public static class Builder {
- public static final String TABLE_NAME_KEY = "tableName";
- public static final String TIME_BOUNDARY_COLUMN_KEY = "timeBoundaryInfo.timeColumn";
- public static final String TIME_BOUNDARY_VALUE_KEY = "timeBoundaryInfo.timeValue";
- private List<WorkerMetadata> _workerMetadataList;
- private Map<String, String> _customProperties;
-
- public Builder() {
- _customProperties = new HashMap<>();
- }
-
- public Builder setWorkerMetadataList(List<WorkerMetadata> workerMetadataList) {
- _workerMetadataList = workerMetadataList;
- return this;
- }
-
- public Builder addTableName(String tableName) {
- _customProperties.put(TABLE_NAME_KEY, tableName);
- return this;
- }
-
- public Builder addTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
- _customProperties.put(TIME_BOUNDARY_COLUMN_KEY, timeBoundaryInfo.getTimeColumn());
- _customProperties.put(TIME_BOUNDARY_VALUE_KEY, timeBoundaryInfo.getTimeValue());
- return this;
- }
-
- public Builder addCustomProperties(Map<String, String> customPropertyMap) {
- _customProperties.putAll(customPropertyMap);
- return this;
- }
-
- public StageMetadata build() {
- return new StageMetadata(_workerMetadataList, _customProperties);
- }
-
- public void putAllCustomProperties(Map<String, String> customPropertyMap) {
- _customProperties.putAll(customPropertyMap);
- }
- }
-
- public static String getTableName(StageMetadata metadata) {
- return metadata.getCustomProperties().get(Builder.TABLE_NAME_KEY);
+ public String getTableName() {
+ return _customProperties.get(DispatchablePlanFragment.TABLE_NAME_KEY);
}
- public static TimeBoundaryInfo getTimeBoundary(StageMetadata metadata) {
- String timeColumn = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_COLUMN_KEY);
- String timeValue = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_VALUE_KEY);
+ public TimeBoundaryInfo getTimeBoundary() {
+ String timeColumn = _customProperties.get(DispatchablePlanFragment.TIME_BOUNDARY_COLUMN_KEY);
+ String timeValue = _customProperties.get(DispatchablePlanFragment.TIME_BOUNDARY_VALUE_KEY);
return timeColumn != null && timeValue != null ? new TimeBoundaryInfo(timeColumn, timeValue) : null;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
similarity index 53%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
index 62e8d19254..a45b48c5de 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StagePlan.java
@@ -18,30 +18,22 @@
*/
package org.apache.pinot.query.runtime.plan;
-import java.util.List;
-import java.util.Map;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.routing.WorkerMetadata;
/**
- * {@code DistributedStagePlan} is the deserialized version of the
- * {@link org.apache.pinot.common.proto.Worker.StagePlan}.
+ * {@code StagePlan} is the deserialized version of the {@link org.apache.pinot.common.proto.Worker.StagePlan}.
*
* <p>It is also the extended version of the {@link org.apache.pinot.core.query.request.ServerQueryRequest}.
*/
-public class DistributedStagePlan {
+public class StagePlan {
private final int _stageId;
- private final VirtualServerAddress _server;
- private final PlanNode _stageRoot;
+ private final PlanNode _rootNode;
private final StageMetadata _stageMetadata;
- public DistributedStagePlan(int stageId, VirtualServerAddress server, PlanNode stageRoot,
- StageMetadata stageMetadata) {
+ public StagePlan(int stageId, PlanNode rootNode, StageMetadata stageMetadata) {
_stageId = stageId;
- _server = server;
- _stageRoot = stageRoot;
+ _rootNode = rootNode;
_stageMetadata = stageMetadata;
}
@@ -49,25 +41,11 @@ public class DistributedStagePlan {
return _stageId;
}
- public VirtualServerAddress getServer() {
- return _server;
- }
-
- public PlanNode getStageRoot() {
- return _stageRoot;
+ public PlanNode getRootNode() {
+ return _rootNode;
}
public StageMetadata getStageMetadata() {
return _stageMetadata;
}
-
- public WorkerMetadata getCurrentWorkerMetadata() {
- return _stageMetadata.getWorkerMetadataList().get(_server.workerId());
- }
-
- public static boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
- WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
- Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata);
- return segments != null && segments.size() > 0;
- }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 3db86807d7..aec7998e16 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -29,13 +29,14 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,8 @@ public class PipelineBreakerExecutor {
*
* @param scheduler scheduler service to run the pipeline breaker main thread.
* @param mailboxService mailbox service to attach the {@link MailboxReceiveNode} against.
- * @param distributedStagePlan the distributed stage plan to run pipeline breaker on.
+ * @param workerMetadata worker metadata for the current worker.
+ * @param stagePlan the distributed stage plan to run pipeline breaker on.
* @param opChainMetadata request metadata, including query options
* @param requestId request ID
* @param deadlineMs execution deadline
@@ -65,23 +67,22 @@ public class PipelineBreakerExecutor {
*/
@Nullable
public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
- MailboxService mailboxService, DistributedStagePlan distributedStagePlan, Map<String, String> opChainMetadata,
- long requestId, long deadlineMs) {
+ MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan,
+ Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
- PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
+ PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), pipelineBreakerContext);
if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
try {
// TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage
// OpChain receive-mail callbacks.
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(mailboxService, requestId, distributedStagePlan.getStageId(),
- distributedStagePlan.getServer(), deadlineMs, opChainMetadata, distributedStagePlan.getStageMetadata(),
- null);
+ new OpChainExecutionContext(mailboxService, requestId, stagePlan.getStageId(), deadlineMs, opChainMetadata,
+ stagePlan.getStageMetadata(), workerMetadata, null);
return execute(scheduler, pipelineBreakerContext, opChainExecutionContext);
} catch (Exception e) {
LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId,
- distributedStagePlan.getStageId(), e);
+ stagePlan.getStageId(), e);
return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), Collections.emptyMap(),
TransferableBlockUtils.getErrorTransferableBlock(e), null);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index f4b34a145a..fbfb9487b4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -18,152 +18,86 @@
*/
package org.apache.pinot.query.runtime.plan.serde;
-import java.util.ArrayList;
-import java.util.HashMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.commons.lang.StringUtils;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
/**
* This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements.
*/
public class QueryPlanSerDeUtils {
- private static final Pattern VIRTUAL_SERVER_PATTERN =
- Pattern.compile("(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
-
private QueryPlanSerDeUtils() {
- // do not instantiate.
- }
-
- public static VirtualServerAddress protoToAddress(String virtualAddressStr) {
- Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(virtualAddressStr);
- if (!matcher.matches()) {
- throw new IllegalArgumentException("Unexpected virtualAddressStr '" + virtualAddressStr + "'. This might "
- + "happen if you are upgrading from an old version of the multistage engine to the current one in a rolling "
- + "fashion.");
- }
-
- // Skipped netty and grpc port as they are not used in worker instance.
- return new VirtualServerAddress(matcher.group("host"), Integer.parseInt(matcher.group("port")),
- Integer.parseInt(matcher.group("virtualid")));
}
- public static String addressToProto(VirtualServerAddress serverAddress) {
- return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port());
+ public static StagePlan fromProtoStagePlan(Worker.StagePlan protoStagePlan)
+ throws InvalidProtocolBufferException {
+ AbstractPlanNode rootNode =
+ StageNodeSerDeUtils.deserializeStageNode(Plan.StageNode.parseFrom(protoStagePlan.getRootNode()));
+ StageMetadata stageMetadata = fromProtoStageMetadata(protoStagePlan.getStageMetadata());
+ return new StagePlan(protoStagePlan.getStageId(), rootNode, stageMetadata);
}
- public static List<DistributedStagePlan> deserializeStagePlan(Worker.StagePlan stagePlan) {
- int stageId = stagePlan.getStageId();
- Worker.StageMetadata protoStageMetadata = stagePlan.getStageMetadata();
- String serverAddress = protoStageMetadata.getServerAddress();
- String[] hostPort = StringUtils.split(serverAddress, ':');
- String hostname = hostPort[0];
- int port = Integer.parseInt(hostPort[1]);
- AbstractPlanNode stageRoot = StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot());
- StageMetadata stageMetadata = fromProtoStageMetadata(protoStageMetadata);
- List<Integer> workerIds = protoStageMetadata.getWorkerIdsList();
- List<DistributedStagePlan> distributedStagePlans = new ArrayList<>(workerIds.size());
- for (int workerId : workerIds) {
- distributedStagePlans.add(
- new DistributedStagePlan(stageId, new VirtualServerAddress(hostname, port, workerId), stageRoot,
- stageMetadata));
- }
- return distributedStagePlans;
- }
-
- private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
- StageMetadata.Builder builder = new StageMetadata.Builder();
- List<WorkerMetadata> workerMetadataList = new ArrayList<>();
- for (Worker.WorkerMetadata protoWorkerMetadata : protoStageMetadata.getWorkerMetadataList()) {
- workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata));
- }
- builder.setWorkerMetadataList(workerMetadataList);
- builder.putAllCustomProperties(protoStageMetadata.getCustomPropertyMap());
- return builder.build();
+ private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata)
+ throws InvalidProtocolBufferException {
+ List<WorkerMetadata> workerMetadataList =
+ protoStageMetadata.getWorkerMetadataList().stream().map(QueryPlanSerDeUtils::fromProtoWorkerMetadata)
+ .collect(Collectors.toList());
+ Map<String, String> customProperties = fromProtoProperties(protoStageMetadata.getCustomProperty());
+ return new StageMetadata(workerMetadataList, customProperties);
}
private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) {
- WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
- builder.setVirtualServerAddress(protoToAddress(protoWorkerMetadata.getVirtualAddress()));
- builder.putAllMailBoxInfosMap(fromProtoMailboxMetadataMap(protoWorkerMetadata.getMailboxMetadataMap()));
- builder.putAllCustomProperties(protoWorkerMetadata.getCustomPropertyMap());
- return builder.build();
- }
-
- private static Map<Integer, MailboxMetadata> fromProtoMailboxMetadataMap(
- Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap) {
- Map<Integer, MailboxMetadata> mailboxMap = new HashMap<>();
- for (Map.Entry<Integer, Worker.MailboxMetadata> entry : mailboxMetadataMap.entrySet()) {
- mailboxMap.put(entry.getKey(), fromProtoMailbox(entry.getValue()));
- }
- return mailboxMap;
+ VirtualServerAddress virtualAddress = VirtualServerAddress.parse(protoWorkerMetadata.getVirtualAddress());
+ Map<Integer, MailboxMetadata> mailboxMetadataMap = protoWorkerMetadata.getMailboxMetadataMap().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> fromProtoMailbox(e.getValue())));
+ return new WorkerMetadata(virtualAddress, mailboxMetadataMap, protoWorkerMetadata.getCustomPropertyMap());
}
private static MailboxMetadata fromProtoMailbox(Worker.MailboxMetadata protoMailboxMetadata) {
- List<String> mailboxIds = new ArrayList<>();
- List<VirtualServerAddress> virtualAddresses = new ArrayList<>();
- for (int i = 0; i < protoMailboxMetadata.getMailboxIdCount(); i++) {
- mailboxIds.add(protoMailboxMetadata.getMailboxId(i));
- virtualAddresses.add(protoToAddress(protoMailboxMetadata.getVirtualAddress(i)));
- }
- MailboxMetadata mailboxMetadata =
- new MailboxMetadata(mailboxIds, virtualAddresses, protoMailboxMetadata.getCustomPropertyMap());
- return mailboxMetadata;
+ List<VirtualServerAddress> virtualAddresses =
+ protoMailboxMetadata.getVirtualAddressList().stream().map(VirtualServerAddress::parse)
+ .collect(Collectors.toList());
+ return new MailboxMetadata(protoMailboxMetadata.getMailboxIdList(), virtualAddresses);
}
- public static Worker.StageMetadata toProtoStageMetadata(List<Worker.WorkerMetadata> workerMetadataList,
- Map<String, String> customProperties, QueryServerInstance serverInstance, List<Integer> workerIds) {
- return Worker.StageMetadata.newBuilder().addAllWorkerMetadata(workerMetadataList)
- .putAllCustomProperty(customProperties)
- .setServerAddress(String.format("%s:%d", serverInstance.getHostname(), serverInstance.getQueryMailboxPort()))
- .addAllWorkerIds(workerIds).build();
+ public static Map<String, String> fromProtoProperties(ByteString protoProperties)
+ throws InvalidProtocolBufferException {
+ return Worker.Properties.parseFrom(protoProperties).getPropertyMap();
}
- public static List<Worker.WorkerMetadata> toProtoWorkerMetadataList(DispatchablePlanFragment planFragment) {
- List<WorkerMetadata> workerMetadataList = planFragment.getWorkerMetadataList();
- List<Worker.WorkerMetadata> protoWorkerMetadataList = new ArrayList<>(workerMetadataList.size());
- for (WorkerMetadata workerMetadata : workerMetadataList) {
- protoWorkerMetadataList.add(toProtoWorkerMetadata(workerMetadata));
- }
- return protoWorkerMetadataList;
+ public static List<Worker.WorkerMetadata> toProtoWorkerMetadataList(List<WorkerMetadata> workerMetadataList) {
+ return workerMetadataList.stream().map(QueryPlanSerDeUtils::toProtoWorkerMetadata).collect(Collectors.toList());
}
private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata workerMetadata) {
- Worker.WorkerMetadata.Builder builder = Worker.WorkerMetadata.newBuilder();
- builder.setVirtualAddress(addressToProto(workerMetadata.getVirtualServerAddress()));
- builder.putAllMailboxMetadata(toProtoMailboxMap(workerMetadata.getMailBoxInfosMap()));
- builder.putAllCustomProperty(workerMetadata.getCustomProperties());
- return builder.build();
+ Map<Integer, Worker.MailboxMetadata> protoMailboxMetadataMap =
+ workerMetadata.getMailboxMetadataMap().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> toProtoMailboxMetadata(e.getValue())));
+ return Worker.WorkerMetadata.newBuilder().setVirtualAddress(workerMetadata.getVirtualAddress().toString())
+ .putAllMailboxMetadata(protoMailboxMetadataMap).putAllCustomProperty(workerMetadata.getCustomProperties())
+ .build();
}
- private static Map<Integer, Worker.MailboxMetadata> toProtoMailboxMap(Map<Integer, MailboxMetadata> mailBoxInfosMap) {
- Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap = new HashMap<>();
- for (Map.Entry<Integer, MailboxMetadata> entry : mailBoxInfosMap.entrySet()) {
- mailboxMetadataMap.put(entry.getKey(), toProtoMailbox(entry.getValue()));
- }
- return mailboxMetadataMap;
+ private static Worker.MailboxMetadata toProtoMailboxMetadata(MailboxMetadata mailboxMetadata) {
+ List<String> virtualAddresses =
+ mailboxMetadata.getVirtualAddresses().stream().map(VirtualServerAddress::toString).collect(Collectors.toList());
+ return Worker.MailboxMetadata.newBuilder().addAllMailboxId(mailboxMetadata.getMailboxIds())
+ .addAllVirtualAddress(virtualAddresses).build();
}
- private static Worker.MailboxMetadata toProtoMailbox(MailboxMetadata mailboxMetadata) {
- Worker.MailboxMetadata.Builder builder = Worker.MailboxMetadata.newBuilder();
- for (int i = 0; i < mailboxMetadata.getMailBoxIdList().size(); i++) {
- builder.addMailboxId(mailboxMetadata.getMailBoxId(i));
- builder.addVirtualAddress(mailboxMetadata.getVirtualAddress(i).toString());
- }
- builder.putAllCustomProperty(mailboxMetadata.getCustomProperties());
- return builder.build();
+ public static ByteString toProtoProperties(Map<String, String> properties) {
+ return Worker.Properties.newBuilder().putAllProperty(properties).build().toByteString();
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 33a955f709..3c03fa1539 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -24,19 +24,19 @@ import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
/**
- * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
+ * Context class for converting a {@link StagePlan} into
* {@link PinotQuery} to execute on server.
*
* On leaf-stage server node, {@link PlanNode} are split into {@link PinotQuery} part and
* {@link org.apache.pinot.query.runtime.operator.OpChain} part.
*/
public class ServerPlanRequestContext {
- private final DistributedStagePlan _stagePlan;
+ private final StagePlan _stagePlan;
private final QueryExecutor _leafQueryExecutor;
private final ExecutorService _executorService;
private final PipelineBreakerResult _pipelineBreakerResult;
@@ -45,7 +45,7 @@ public class ServerPlanRequestContext {
private PlanNode _leafStageBoundaryNode;
private List<ServerQueryRequest> _serverQueryRequests;
- public ServerPlanRequestContext(DistributedStagePlan stagePlan, QueryExecutor leafQueryExecutor,
+ public ServerPlanRequestContext(StagePlan stagePlan, QueryExecutor leafQueryExecutor,
ExecutorService executorService, PipelineBreakerResult pipelineBreakerResult) {
_stagePlan = stagePlan;
_leafQueryExecutor = leafQueryExecutor;
@@ -54,7 +54,7 @@ public class ServerPlanRequestContext {
_pinotQuery = new PinotQuery();
}
- public DistributedStagePlan getStagePlan() {
+ public StagePlan getStagePlan() {
return _stagePlan;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 832b2a4666..4c504f71d6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -47,12 +47,11 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -64,8 +63,6 @@ import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQuer
import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ServerPlanRequestUtils {
@@ -73,7 +70,6 @@ public class ServerPlanRequestUtils {
}
private static final int DEFAULT_LEAF_NODE_LIMIT = Integer.MAX_VALUE;
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerPlanRequestUtils.class);
private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
ImmutableList.of(PredicateComparisonRewriter.class.getName(),
NonAggregationGroupByToDistinctQueryRewriter.class.getName());
@@ -82,30 +78,29 @@ public class ServerPlanRequestUtils {
private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
/**
- * main entry point for compiling leaf-stage {@link DistributedStagePlan}.
+ * main entry point for compiling leaf-stage {@link StagePlan}.
*
* @param executionContext the execution context used by the leaf-stage execution engine.
- * @param distributedStagePlan the distribute stage plan on the leaf.
+ * @param stagePlan the distribute stage plan on the leaf.
* @return an opChain that executes the leaf-stage, with the leaf-stage execution encapsulated within.
*/
- public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
- DistributedStagePlan distributedStagePlan, HelixManager helixManager, ServerMetrics serverMetrics,
- QueryExecutor leafQueryExecutor, ExecutorService executorService) {
+ public static OpChain compileLeafStage(OpChainExecutionContext executionContext, StagePlan stagePlan,
+ HelixManager helixManager, ServerMetrics serverMetrics, QueryExecutor leafQueryExecutor,
+ ExecutorService executorService) {
long queryArrivalTimeMs = System.currentTimeMillis();
- ServerPlanRequestContext serverContext = new ServerPlanRequestContext(distributedStagePlan, leafQueryExecutor,
- executorService, executionContext.getPipelineBreakerResult());
+ ServerPlanRequestContext serverContext = new ServerPlanRequestContext(stagePlan, leafQueryExecutor, executorService,
+ executionContext.getPipelineBreakerResult());
// 1. compile the PinotQuery
constructPinotQueryPlan(serverContext, executionContext.getOpChainMetadata());
// 2. convert PinotQuery into InstanceRequest list (one for each physical table)
List<InstanceRequest> instanceRequestList =
- ServerPlanRequestUtils.constructServerQueryRequests(executionContext, serverContext, distributedStagePlan,
- helixManager.getHelixPropertyStore());
+ constructServerQueryRequests(executionContext, serverContext, helixManager.getHelixPropertyStore());
serverContext.setServerQueryRequests(instanceRequestList.stream()
.map(instanceRequest -> new ServerQueryRequest(instanceRequest, serverMetrics, queryArrivalTimeMs, true))
.collect(Collectors.toList()));
// compile the OpChain
executionContext.setLeafStageContext(serverContext);
- return PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
+ return PhysicalPlanVisitor.walkPlanNode(stagePlan.getRootNode(), executionContext);
}
/**
@@ -117,18 +112,13 @@ public class ServerPlanRequestUtils {
*/
private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext,
Map<String, String> requestMetadata) {
- DistributedStagePlan stagePlan = serverContext.getStagePlan();
+ StagePlan stagePlan = serverContext.getStagePlan();
PinotQuery pinotQuery = serverContext.getPinotQuery();
- pinotQuery.setExplain(false);
// attach leaf node limit it not set
Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadata);
- if (leafNodeLimit != null) {
- pinotQuery.setLimit(leafNodeLimit);
- } else {
- pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
- }
+ pinotQuery.setLimit(leafNodeLimit != null ? leafNodeLimit : DEFAULT_LEAF_NODE_LIMIT);
// visit the plan and create PinotQuery and determine the leaf stage boundary PlanNode.
- ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
+ ServerPlanRequestVisitor.walkStageNode(stagePlan.getRootNode(), serverContext);
}
/**
@@ -139,17 +129,16 @@ public class ServerPlanRequestUtils {
* @return a list of server instance request to be run.
*/
public static List<InstanceRequest> constructServerQueryRequests(OpChainExecutionContext executionContext,
- ServerPlanRequestContext serverContext, DistributedStagePlan distributedStagePlan,
- ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
- StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
- WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
- String rawTableName = StageMetadata.getTableName(stageMetadata);
- int stageId = distributedStagePlan.getStageId();
- Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
- List<InstanceRequest> requests = new ArrayList<>();
- for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
- String tableType = tableEntry.getKey();
- List<String> segmentList = tableEntry.getValue();
+ ServerPlanRequestContext serverContext, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
+ int stageId = executionContext.getStageId();
+ StageMetadata stageMetadata = executionContext.getStageMetadata();
+ String rawTableName = stageMetadata.getTableName();
+ Map<String, List<String>> tableSegmentsMap = executionContext.getWorkerMetadata().getTableSegmentsMap();
+ assert tableSegmentsMap != null;
+ List<InstanceRequest> requests = new ArrayList<>(tableSegmentsMap.size());
+ for (Map.Entry<String, List<String>> entry : tableSegmentsMap.entrySet()) {
+ String tableType = entry.getKey();
+ List<String> segments = entry.getValue();
// ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
// network traffic. but there's chance to improve this:
// TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
@@ -158,15 +147,15 @@ public class ServerPlanRequestUtils {
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
- tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, segmentList));
+ requests.add(compileInstanceRequest(executionContext, serverContext, stageId, tableConfig, schema,
+ stageMetadata.getTimeBoundary(), TableType.OFFLINE, segments));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
- tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, segmentList));
+ requests.add(compileInstanceRequest(executionContext, serverContext, stageId, tableConfig, schema,
+ stageMetadata.getTimeBoundary(), TableType.REALTIME, segments));
} else {
throw new IllegalArgumentException("Unsupported table type key: " + tableType);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 405f619a9b..06df28f561 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.service.dispatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,7 +39,6 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.util.Pair;
import org.apache.pinot.common.datablock.DataBlock;
-import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -111,41 +111,43 @@ public class QueryDispatcher {
void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map<String, String> queryOptions)
throws Exception {
Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS);
+
+ // Serialize the stage plans in parallel
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+ Set<QueryServerInstance> serverInstances = new HashSet<>();
// Ignore the reduce stage (stage 0)
int numStages = stagePlans.size() - 1;
- Set<QueryServerInstance> serverInstances = new HashSet<>();
- // Serialize the stage plans in parallel
- Plan.StageNode[] stageRootNodes = new Plan.StageNode[numStages];
- //noinspection unchecked
- List<Worker.WorkerMetadata>[] stageWorkerMetadataLists = new List[numStages];
- CompletableFuture<?>[] stagePlanSerializationStubs = new CompletableFuture[2 * numStages];
+ List<CompletableFuture<StageInfo>> stageInfoFutures = new ArrayList<>(numStages);
for (int i = 0; i < numStages; i++) {
DispatchablePlanFragment stagePlan = stagePlans.get(i + 1);
serverInstances.addAll(stagePlan.getServerInstanceToWorkerIdMap().keySet());
- int finalI = i;
- stagePlanSerializationStubs[2 * i] = CompletableFuture.runAsync(() -> stageRootNodes[finalI] =
- StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot()),
- _executorService);
- stagePlanSerializationStubs[2 * i + 1] = CompletableFuture.runAsync(
- () -> stageWorkerMetadataLists[finalI] = QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan),
- _executorService);
+ stageInfoFutures.add(CompletableFuture.supplyAsync(() -> {
+ ByteString rootNode =
+ StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot())
+ .toByteString();
+ ByteString customProperty = QueryPlanSerDeUtils.toProtoProperties(stagePlan.getCustomProperties());
+ return new StageInfo(rootNode, customProperty);
+ }, _executorService));
}
+ List<StageInfo> stageInfos = new ArrayList<>(numStages);
try {
- CompletableFuture.allOf(stagePlanSerializationStubs)
- .get(deadline.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ for (CompletableFuture<StageInfo> future : stageInfoFutures) {
+ stageInfos.add(future.get(deadline.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS));
+ }
} finally {
- for (CompletableFuture<?> future : stagePlanSerializationStubs) {
+ for (CompletableFuture<?> future : stageInfoFutures) {
if (!future.isDone()) {
future.cancel(true);
}
}
}
+
Map<String, String> requestMetadata = new HashMap<>();
requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId));
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
requestMetadata.putAll(queryOptions);
+ ByteString protoRequestMetadata = QueryPlanSerDeUtils.toProtoProperties(requestMetadata);
// Submit the query plan to all servers in parallel
int numServers = serverInstances.size();
@@ -159,13 +161,23 @@ public class QueryDispatcher {
DispatchablePlanFragment stagePlan = stagePlans.get(stageId);
List<Integer> workerIds = stagePlan.getServerInstanceToWorkerIdMap().get(serverInstance);
if (workerIds != null) {
+ List<WorkerMetadata> stageWorkerMetadataList = stagePlan.getWorkerMetadataList();
+ List<WorkerMetadata> workerMetadataList = new ArrayList<>(workerIds.size());
+ for (int workerId : workerIds) {
+ workerMetadataList.add(stageWorkerMetadataList.get(workerId));
+ }
+ List<Worker.WorkerMetadata> protoWorkerMetadataList =
+ QueryPlanSerDeUtils.toProtoWorkerMetadataList(workerMetadataList);
+ StageInfo stageInfo = stageInfos.get(i);
+ Worker.StageMetadata stageMetadata =
+ Worker.StageMetadata.newBuilder().addAllWorkerMetadata(protoWorkerMetadataList)
+ .setCustomProperty(stageInfo._customProperty).build();
requestBuilder.addStagePlan(
- Worker.StagePlan.newBuilder().setStageId(stageId).setStageRoot(stageRootNodes[i]).setStageMetadata(
- QueryPlanSerDeUtils.toProtoStageMetadata(stageWorkerMetadataLists[i],
- stagePlan.getCustomProperties(), serverInstance, workerIds)).build());
+ Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(stageInfo._rootNode)
+ .setStageMetadata(stageMetadata).build());
}
}
- requestBuilder.putAllMetadata(requestMetadata);
+ requestBuilder.setMetadata(protoRequestMetadata);
getOrCreateDispatchClient(serverInstance).submit(requestBuilder.build(), serverInstance, deadline,
dispatchCallbacks::offer);
} catch (Throwable t) {
@@ -204,6 +216,16 @@ public class QueryDispatcher {
}
}
+ private static class StageInfo {
+ final ByteString _rootNode;
+ final ByteString _customProperty;
+
+ private StageInfo(ByteString rootNode, ByteString customProperty) {
+ _rootNode = rootNode;
+ _customProperty = customProperty;
+ }
+ }
+
private void cancel(long requestId, DispatchableSubPlan dispatchableSubPlan) {
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
int numStages = stagePlans.size();
@@ -233,21 +255,19 @@ public class QueryDispatcher {
Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap,
MailboxService mailboxService) {
// NOTE: Reduce stage is always stage 0
- DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(0);
- PlanFragment planFragment = dispatchablePlanFragment.getPlanFragment();
+ DispatchablePlanFragment dispatchableStagePlan = dispatchableSubPlan.getQueryStageList().get(0);
+ PlanFragment planFragment = dispatchableStagePlan.getPlanFragment();
PlanNode rootNode = planFragment.getFragmentRoot();
Preconditions.checkState(rootNode instanceof MailboxReceiveNode,
"Expecting mailbox receive node as root of reduce stage, got: %s", rootNode.getClass().getSimpleName());
MailboxReceiveNode receiveNode = (MailboxReceiveNode) rootNode;
- List<WorkerMetadata> workerMetadataList = dispatchablePlanFragment.getWorkerMetadataList();
+ List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s",
workerMetadataList.size());
- StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(workerMetadataList)
- .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
+ StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
OpChainExecutionContext opChainExecutionContext =
new OpChainExecutionContext(mailboxService, requestId, planFragment.getFragmentId(),
- workerMetadataList.get(0).getVirtualServerAddress(), System.currentTimeMillis() + timeoutMs, queryOptions,
- stageMetadata, null);
+ System.currentTimeMillis() + timeoutMs, queryOptions, stageMetadata, workerMetadataList.get(0), null);
MailboxReceiveOperator receiveOperator =
new MailboxReceiveOperator(opChainExecutionContext, receiveNode.getDistributionType(),
receiveNode.getSenderStageId());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index ecfa9b09f8..2e52c28a5a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -31,8 +31,10 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -95,31 +97,43 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
@Override
public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
- Map<String, String> requestMetadata = request.getMetadataMap();
+ Map<String, String> requestMetadata;
+ try {
+ requestMetadata = QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deserializing request metadata", e);
+ responseObserver.onNext(Worker.QueryResponse.newBuilder()
+ .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
+ QueryException.getTruncatedStackTrace(e)).build());
+ responseObserver.onCompleted();
+ return;
+ }
long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
- List<Worker.StagePlan> stagePlans = request.getStagePlanList();
- int numStages = stagePlans.size();
+ List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
+ int numStages = protoStagePlans.size();
CompletableFuture<?>[] stageSubmissionStubs = new CompletableFuture[numStages];
for (int i = 0; i < numStages; i++) {
- Worker.StagePlan stagePlan = stagePlans.get(i);
+ Worker.StagePlan protoStagePlan = protoStagePlans.get(i);
stageSubmissionStubs[i] = CompletableFuture.runAsync(() -> {
- List<DistributedStagePlan> workerPlans;
+ StagePlan stagePlan;
try {
- workerPlans = QueryPlanSerDeUtils.deserializeStagePlan(stagePlan);
+ stagePlan = QueryPlanSerDeUtils.fromProtoStagePlan(protoStagePlan);
} catch (Exception e) {
throw new RuntimeException(
- String.format("Caught exception while deserializing stage plan for request: %d, stage id: %d", requestId,
- stagePlan.getStageId()), e);
+ String.format("Caught exception while deserializing stage plan for request: %d, stage: %d", requestId,
+ protoStagePlan.getStageId()), e);
}
- int numWorkers = workerPlans.size();
+ StageMetadata stageMetadata = stagePlan.getStageMetadata();
+ List<WorkerMetadata> workerMetadataList = stageMetadata.getWorkerMetadataList();
+ int numWorkers = workerMetadataList.size();
CompletableFuture<?>[] workerSubmissionStubs = new CompletableFuture[numWorkers];
for (int j = 0; j < numWorkers; j++) {
- DistributedStagePlan workerPlan = workerPlans.get(j);
+ WorkerMetadata workerMetadata = workerMetadataList.get(j);
workerSubmissionStubs[j] =
- CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerPlan, requestMetadata),
+ CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerMetadata, stagePlan, requestMetadata),
_querySubmissionExecutorService);
}
try {
@@ -127,8 +141,8 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
.get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(
- String.format("Caught exception while submitting request: %d, stage id: %d", requestId,
- stagePlan.getStageId()), e);
+ String.format("Caught exception while submitting request: %d, stage: %d", requestId,
+ protoStagePlan.getStageId()), e);
} finally {
for (CompletableFuture<?> future : workerSubmissionStubs) {
if (!future.isDone()) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index b4b1dff3cc..1811218eda 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -27,8 +27,9 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.data.Schema;
@@ -109,9 +110,9 @@ public class QueryServerEnclosure {
_queryRunner.shutDown();
}
- public CompletableFuture<Void> processQuery(DistributedStagePlan distributedStagePlan,
+ public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan,
Map<String, String> requestMetadataMap) {
- return CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadataMap),
+ return CompletableFuture.runAsync(() -> _queryRunner.processQuery(workerMetadata, stagePlan, requestMetadataMap),
_queryRunner.getExecutorService());
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index 8e6b563ac4..3f20d33956 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
@@ -299,7 +300,8 @@ public class MailboxServiceTest {
SendingMailbox sendingMailbox =
_mailboxService1.getSendingMailbox("localhost", _mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
ReceivingMailbox receivingMailbox = _mailboxService1.getReceivingMailbox(mailboxId);
- receivingMailbox.registeredReader(() -> { });
+ receivingMailbox.registeredReader(() -> {
+ });
// send a block
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
@@ -591,15 +593,16 @@ public class MailboxServiceTest {
SendingMailbox sendingMailbox =
_mailboxService2.getSendingMailbox("localhost", _mailboxService1.getPort(), mailboxId, Long.MAX_VALUE);
ReceivingMailbox receivingMailbox = _mailboxService1.getReceivingMailbox(mailboxId);
- receivingMailbox.registeredReader(() -> { });
+ receivingMailbox.registeredReader(() -> {
+ });
// send a block
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
// receiving-side early terminates after pulling the first block
TestUtils.waitForCondition(aVoid -> {
- TransferableBlock block = receivingMailbox.poll();
- return block != null && block.getNumRows() == 1;
- }, 1000L, "Failed to deliver mails");
+ TransferableBlock block = receivingMailbox.poll();
+ return block != null && block.getNumRows() == 1;
+ }, 1000L, "Failed to deliver mails");
receivingMailbox.earlyTerminate();
// send another block b/c it doesn't guarantee the next block must be EOS
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index e79f46e671..c34c858e76 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -18,17 +18,20 @@
*/
package org.apache.pinot.query.runtime.executor;
-import java.util.Collections;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -67,9 +70,10 @@ public class OpChainSchedulerServiceTest {
}
private OpChain getChain(MultiStageOperator operator) {
- VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
- OpChainExecutionContext context =
- new OpChainExecutionContext(null, 123L, 1, address, Long.MAX_VALUE, Collections.emptyMap(), null, null);
+ WorkerMetadata workerMetadata =
+ new WorkerMetadata(new VirtualServerAddress("localhost", 123, 0), ImmutableMap.of(), ImmutableMap.of());
+ OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, Long.MAX_VALUE, ImmutableMap.of(),
+ new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
return new OpChain(context, operator);
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 9f5b8dfffe..1a2949b142 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -26,9 +26,9 @@ import java.util.stream.Stream;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -39,6 +39,7 @@ import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -51,12 +52,14 @@ import static org.testng.Assert.assertTrue;
public class MailboxReceiveOperatorTest {
- private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
private static final DataSchema DATA_SCHEMA =
new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 1, 1, 0, 0);
+ private StageMetadata _stageMetadataBoth;
+ private StageMetadata _stageMetadata1;
+
private AutoCloseable _mocks;
@Mock
private MailboxService _mailboxService;
@@ -64,40 +67,32 @@ public class MailboxReceiveOperatorTest {
private ReceivingMailbox _mailbox1;
@Mock
private ReceivingMailbox _mailbox2;
- private StageMetadata _stageMetadataBoth;
- private StageMetadata _stageMetadata1;
- @BeforeMethod
+ @BeforeClass
public void setUp() {
+ VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
+ VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
+ _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
+ new MailboxMetadata(
+ ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
+ ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
+ _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
+ ImmutableMap.of())), ImmutableMap.of());
+ }
+
+ @BeforeMethod
+ public void setUpMethod() {
_mocks = MockitoAnnotations.openMocks(this);
when(_mailboxService.getHostname()).thenReturn("localhost");
when(_mailboxService.getPort()).thenReturn(123);
- VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
- VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .build()).collect(Collectors.toList())).build();
- _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .build()).collect(Collectors.toList())).build();
}
@AfterMethod
- public void tearDown()
+ public void tearDownMethod()
throws Exception {
_mocks.close();
}
@@ -105,7 +100,7 @@ public class MailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
//noinspection resource
new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1);
}
@@ -116,8 +111,7 @@ public class MailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 1000L,
- _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, System.currentTimeMillis() + 1000L, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
Thread.sleep(100L);
TransferableBlock block = receiveOp.nextBlock();
@@ -132,7 +126,7 @@ public class MailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -146,7 +140,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
assertEquals(actualRows.size(), 1);
@@ -163,7 +157,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
TransferableBlock block = receiveOp.nextBlock();
assertTrue(block.isErrorBlock());
@@ -181,7 +175,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -204,7 +198,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
// Receive first block from server1
@@ -229,7 +223,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
TransferableBlock block = receiveOp.nextBlock();
@@ -251,7 +245,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
// Receive first block from server1
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 7a49dcf16a..86b2ac0000 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -18,7 +18,8 @@
*/
package org.apache.pinot.query.runtime.operator;
-import java.util.Collections;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -50,7 +51,6 @@ public class MailboxSendOperatorTest {
private static final int SENDER_STAGE_ID = 1;
private AutoCloseable _mocks;
-
@Mock
private VirtualServerAddress _server;
@Mock
@@ -61,8 +61,7 @@ public class MailboxSendOperatorTest {
private BlockExchange _exchange;
@BeforeMethod
- public void setUp()
- throws Exception {
+ public void setUpMethod() {
_mocks = openMocks(this);
when(_server.hostname()).thenReturn("mock");
when(_server.port()).thenReturn(0);
@@ -70,7 +69,7 @@ public class MailboxSendOperatorTest {
}
@AfterMethod
- public void tearDown()
+ public void tearDownMethod()
throws Exception {
_mocks.close();
}
@@ -199,11 +198,11 @@ public class MailboxSendOperatorTest {
}
private MailboxSendOperator getMailboxSendOperator() {
- StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
- Collections.singletonList(new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
+ WorkerMetadata workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(), ImmutableMap.of());
+ StageMetadata stageMetadata = new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE,
- Collections.emptyMap(), stageMetadata, null);
+ new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, Long.MAX_VALUE, ImmutableMap.of(),
+ stageMetadata, workerMetadata, null);
return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index fb937e6238..a74dec4e6f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -22,15 +22,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datatable.DataTable;
@@ -77,8 +74,17 @@ public class OpChainTest {
private static int _numOperatorsInitialized = 0;
private final List<TransferableBlock> _blockList = new ArrayList<>();
- private final ExecutorService _executorService = Executors.newCachedThreadPool();
+ private final ExecutorService _executor = Executors.newCachedThreadPool();
private final AtomicReference<LeafStageTransferableBlockOperator> _leafOpRef = new AtomicReference<>();
+ private final VirtualServerAddress _serverAddress = new VirtualServerAddress("localhost", 123, 0);
+ private final WorkerMetadata _workerMetadata = new WorkerMetadata(_serverAddress, ImmutableMap.of(0,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
+ ImmutableList.of(_serverAddress)), 1,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
+ ImmutableList.of(_serverAddress)), 2,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
+ ImmutableList.of(_serverAddress))), ImmutableMap.of());
+ private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
private AutoCloseable _mocks;
@Mock
@@ -94,22 +100,9 @@ public class OpChainTest {
@Mock
private BlockExchange _exchange;
- private VirtualServerAddress _serverAddress;
- private StageMetadata _receivingStageMetadata;
-
@BeforeMethod
public void setUpMethod() {
_mocks = MockitoAnnotations.openMocks(this);
- _serverAddress = new VirtualServerAddress("localhost", 123, 0);
- _receivingStageMetadata = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_serverAddress).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
- ImmutableMap.of())).addMailBoxInfoMap(1,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
- ImmutableMap.of())).addMailBoxInfoMap(2,
- new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
- ImmutableMap.of())).build()).collect(Collectors.toList())).build();
-
when(_mailboxService1.getReceivingMailbox(any())).thenReturn(_mailbox1);
when(_mailboxService2.getReceivingMailbox(any())).thenReturn(_mailbox2);
@@ -139,7 +132,7 @@ public class OpChainTest {
@AfterClass
public void tearDown() {
- _executorService.shutdown();
+ _executor.shutdown();
}
@Test
@@ -208,9 +201,8 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
- OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, Long.MAX_VALUE,
- Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), _receivingStageMetadata, null);
+ OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE,
+ ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -223,8 +215,8 @@ public class OpChainTest {
opChain.getStats().queued();
OpChainExecutionContext secondStageContext =
- new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, Long.MAX_VALUE,
- Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), _receivingStageMetadata, null);
+ new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE,
+ ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"), _stageMetadata, _workerMetadata, null);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -249,8 +241,8 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, Long.MAX_VALUE,
- Collections.emptyMap(), _receivingStageMetadata, null);
+ new OpChainExecutionContext(_mailboxService1, 1, senderStageId, Long.MAX_VALUE, ImmutableMap.of(),
+ _stageMetadata, _workerMetadata, null);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -261,8 +253,8 @@ public class OpChainTest {
opChain.getStats().queued();
OpChainExecutionContext secondStageContext =
- new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, Long.MAX_VALUE,
- Collections.emptyMap(), _receivingStageMetadata, null);
+ new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, Long.MAX_VALUE, ImmutableMap.of(),
+ _stageMetadata, _workerMetadata, null);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
@@ -296,19 +288,19 @@ public class OpChainTest {
}
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl");
- List<BaseResultsBlock> dataBlocks = Collections.singletonList(
+ List<BaseResultsBlock> dataBlocks = ImmutableList.of(
new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2}), queryContext));
InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new MetadataResultsBlock());
QueryExecutor queryExecutor = mockQueryExecutor(dataBlocks, metadataBlock);
LeafStageTransferableBlockOperator leafOp =
- new LeafStageTransferableBlockOperator(context, Collections.singletonList(mock(ServerQueryRequest.class)),
- upStreamSchema, queryExecutor, _executorService);
+ new LeafStageTransferableBlockOperator(context, ImmutableList.of(mock(ServerQueryRequest.class)),
+ upStreamSchema, queryExecutor, _executor);
_leafOpRef.set(leafOp);
//Transform operator
RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
TransformOperator transformOp =
- new TransformOperator(context, leafOp, upStreamSchema, Collections.singletonList(ref0), upStreamSchema);
+ new TransformOperator(context, leafOp, upStreamSchema, ImmutableList.of(ref0), upStreamSchema);
//Filter operator
RexExpression booleanLiteral = new RexExpression.Literal(ColumnDataType.BOOLEAN, 1);
@@ -377,7 +369,7 @@ public class OpChainTest {
@Override
public List<MultiStageOperator> getChildOperators() {
- return Collections.singletonList(_upstream);
+ return ImmutableList.of(_upstream);
}
@Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 5f139e5545..3c132269c7 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -18,15 +18,16 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -38,8 +39,8 @@ import org.apache.pinot.spi.utils.CommonConstants;
public class OperatorTestUtil {
// simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
private static final List<List<Object[]>> SIMPLE_KV_DATA_ROWS =
- Arrays.asList(Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}),
- Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"}));
+ ImmutableList.of(ImmutableList.of(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}),
+ ImmutableList.of(new Object[]{1, "AA"}, new Object[]{2, "Aa"}));
private static final MockDataBlockOperatorFactory MOCK_OPERATOR_FACTORY;
public static final DataSchema SIMPLE_KV_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
@@ -75,21 +76,24 @@ public class OperatorTestUtil {
return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW);
}
- public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService,
- VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata stageMetadata) {
- return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, deadlineMs, Collections.emptyMap(),
- stageMetadata, null);
+ public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, long deadlineMs,
+ StageMetadata stageMetadata) {
+ return new OpChainExecutionContext(mailboxService, 0, 0, deadlineMs, ImmutableMap.of(), stageMetadata,
+ stageMetadata.getWorkerMetadataList().get(0), null);
}
public static OpChainExecutionContext getDefaultContext() {
- VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE,
- Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), null, null);
+ return getDefaultContext(ImmutableMap.of(CommonConstants.Broker.Request.TRACE, "true"));
}
public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
- VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Collections.emptyMap(), null,
- null);
+ return getDefaultContext(ImmutableMap.of());
+ }
+
+ private static OpChainExecutionContext getDefaultContext(Map<String, String> opChainMetadata) {
+ WorkerMetadata workerMetadata =
+ new WorkerMetadata(new VirtualServerAddress("mock", 80, 0), ImmutableMap.of(), ImmutableMap.of());
+ return new OpChainExecutionContext(null, 1, 2, Long.MAX_VALUE, opChainMetadata,
+ new StageMetadata(ImmutableList.of(workerMetadata), ImmutableMap.of()), workerMetadata, null);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 4de9e3d4c6..1e71018215 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -30,10 +30,10 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -44,6 +44,7 @@ import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -56,7 +57,6 @@ import static org.testng.Assert.assertTrue;
public class SortedMailboxReceiveOperatorTest {
- private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
private static final DataSchema DATA_SCHEMA =
new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
private static final List<RexExpression> COLLATION_KEYS = Collections.singletonList(new RexExpression.InputRef(0));
@@ -65,6 +65,9 @@ public class SortedMailboxReceiveOperatorTest {
private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 1, 1, 0, 0);
+ private StageMetadata _stageMetadataBoth;
+ private StageMetadata _stageMetadata1;
+
private AutoCloseable _mocks;
@Mock
private MailboxService _mailboxService;
@@ -73,40 +76,31 @@ public class SortedMailboxReceiveOperatorTest {
@Mock
private ReceivingMailbox _mailbox2;
- private StageMetadata _stageMetadataBoth;
- private StageMetadata _stageMetadata1;
+ @BeforeClass
+ public void setUp() {
+ VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
+ VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
+ _stageMetadataBoth = new StageMetadata(Stream.of(server1, server2).map(s -> new WorkerMetadata(s, ImmutableMap.of(0,
+ new MailboxMetadata(
+ ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2)), 1, new MailboxMetadata(
+ ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2))), ImmutableMap.of())).collect(Collectors.toList()), ImmutableMap.of());
+ _stageMetadata1 = new StageMetadata(ImmutableList.of(new WorkerMetadata(server1, ImmutableMap.of(0,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1)), 1,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(server1))),
+ ImmutableMap.of())), ImmutableMap.of());
+ }
@BeforeMethod
- public void setUp() {
+ public void setUpMethod() {
_mocks = MockitoAnnotations.openMocks(this);
when(_mailboxService.getHostname()).thenReturn("localhost");
when(_mailboxService.getPort()).thenReturn(123);
- VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
- VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .build()).collect(Collectors.toList())).build();
- _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .build()).collect(Collectors.toList())).build();
}
@AfterMethod
- public void tearDown()
+ public void tearDownMethod()
throws Exception {
_mocks.close();
}
@@ -114,7 +108,7 @@ public class SortedMailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -124,7 +118,7 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldThrowOnEmptyCollationKey() {
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), false, 1);
@@ -136,8 +130,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 1000L,
- _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, System.currentTimeMillis() + 1000L, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -153,7 +146,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -168,7 +161,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -186,7 +179,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -205,7 +198,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -230,7 +223,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -255,7 +248,7 @@ public class SortedMailboxReceiveOperatorTest {
OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -286,7 +279,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
+ OperatorTestUtil.getOpChainContext(_mailboxService, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
false, 1)) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index e7fa0e7db2..94d5e2b873 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -20,22 +20,18 @@ package org.apache.pinot.query.runtime.plan.pipeline;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.NamedThreadFactory;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.routing.MailboxMetadata;
@@ -46,8 +42,8 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -61,12 +57,22 @@ import static org.mockito.Mockito.when;
public class PipelineBreakerExecutorTest {
- private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
private static final DataSchema DATA_SCHEMA =
new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 2, 0, 0, 0);
+ private final VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
+ private final ExecutorService _executor = Executors.newCachedThreadPool();
+ private final OpChainSchedulerService _scheduler = new OpChainSchedulerService(_executor);
+ private final WorkerMetadata _workerMetadata = new WorkerMetadata(_server, ImmutableMap.of(0, new MailboxMetadata(
+ ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0), MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
+ ImmutableList.of(_server, _server)), 1,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)), ImmutableList.of(_server)), 2,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)), ImmutableList.of(_server))),
+ ImmutableMap.of());
+ private final StageMetadata _stageMetadata = new StageMetadata(ImmutableList.of(_workerMetadata), ImmutableMap.of());
+
private AutoCloseable _mocks;
@Mock
private MailboxService _mailboxService;
@@ -75,31 +81,8 @@ public class PipelineBreakerExecutorTest {
@Mock
private ReceivingMailbox _mailbox2;
- private VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
- private ExecutorService _executor = Executors.newCachedThreadPool(
- new NamedThreadFactory("worker_on_asd_" + getClass().getSimpleName()));
- private OpChainSchedulerService _scheduler = new OpChainSchedulerService(_executor);
- private StageMetadata _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_server).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
- ImmutableList.of(_server), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(_server), ImmutableMap.of()))
- .addMailBoxInfoMap(2, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
- ImmutableList.of(_server), ImmutableMap.of()))
- .build()).collect(Collectors.toList())).build();
-
- @AfterClass
- public void tearDownClass() {
- ExecutorServiceUtils.close(_executor);
- }
-
@BeforeMethod
- public void setUp() {
+ public void setUpMethod() {
_mocks = MockitoAnnotations.openMocks(this);
when(_mailboxService.getHostname()).thenReturn("localhost");
when(_mailboxService.getPort()).thenReturn(123);
@@ -109,18 +92,22 @@ public class PipelineBreakerExecutorTest {
}
@AfterMethod
- public void tearDown()
+ public void tearDownMethod()
throws Exception {
_mocks.close();
}
+ @AfterClass
+ public void tearDown() {
+ ExecutorServiceUtils.close(_executor);
+ }
+
@Test
public void shouldReturnBlocksUponNormalOperation() {
MailboxReceiveNode mailboxReceiveNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- DistributedStagePlan distributedStagePlan =
- new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, _stageMetadata1);
+ StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -131,8 +118,8 @@ public class PipelineBreakerExecutorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server)));
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
- Collections.emptyMap(), 0, Long.MAX_VALUE);
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+ ImmutableMap.of(), 0, Long.MAX_VALUE);
// then
// should have single PB result, receive 2 data blocks, EOS block shouldn't be included
@@ -155,11 +142,10 @@ public class PipelineBreakerExecutorTest {
new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
JoinNode joinNode =
- new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList());
+ new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(mailboxReceiveNode2);
- DistributedStagePlan distributedStagePlan =
- new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+ StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -172,8 +158,8 @@ public class PipelineBreakerExecutorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 2, _server)));
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
- Collections.emptyMap(), 0, Long.MAX_VALUE);
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+ ImmutableMap.of(), 0, Long.MAX_VALUE);
// then
// should have two PB result, receive 2 data blocks, one each, EOS block shouldn't be included
@@ -195,13 +181,12 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode incorrectlyConfiguredMailboxNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- DistributedStagePlan distributedStagePlan =
- new DistributedStagePlan(0, RECEIVER_ADDRESS, incorrectlyConfiguredMailboxNode, _stageMetadata1);
+ StagePlan stagePlan = new StagePlan(0, incorrectlyConfiguredMailboxNode, _stageMetadata);
// when
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
- Collections.emptyMap(), 0, Long.MAX_VALUE);
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+ ImmutableMap.of(), 0, Long.MAX_VALUE);
// then
// should return empty block list
@@ -219,8 +204,7 @@ public class PipelineBreakerExecutorTest {
MailboxReceiveNode mailboxReceiveNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
- DistributedStagePlan distributedStagePlan =
- new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, _stageMetadata1);
+ StagePlan stagePlan = new StagePlan(0, mailboxReceiveNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -231,8 +215,8 @@ public class PipelineBreakerExecutorTest {
});
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
- Collections.emptyMap(), 0, System.currentTimeMillis() + 100);
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+ ImmutableMap.of(), 0, System.currentTimeMillis() + 100);
// then
// should contain only failure error blocks
@@ -253,11 +237,10 @@ public class PipelineBreakerExecutorTest {
new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
JoinNode joinNode =
- new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList());
+ new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
- DistributedStagePlan distributedStagePlan =
- new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+ StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -270,8 +253,8 @@ public class PipelineBreakerExecutorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
- Collections.emptyMap(), 0, Long.MAX_VALUE);
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+ ImmutableMap.of(), 0, Long.MAX_VALUE);
// then
// should pass when one PB returns result, the other returns empty.
@@ -292,11 +275,10 @@ public class PipelineBreakerExecutorTest {
new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
JoinNode joinNode =
- new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList());
+ new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, ImmutableList.of());
joinNode.addInput(mailboxReceiveNode1);
joinNode.addInput(incorrectlyConfiguredMailboxNode);
- DistributedStagePlan distributedStagePlan =
- new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+ StagePlan stagePlan = new StagePlan(0, joinNode, _stageMetadata);
// when
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
@@ -309,8 +291,8 @@ public class PipelineBreakerExecutorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
- Collections.emptyMap(), 0, Long.MAX_VALUE);
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, _workerMetadata, stagePlan,
+ ImmutableMap.of(), 0, Long.MAX_VALUE);
// then
// should fail even if one of the 2 PB doesn't contain error block from sender.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
deleted file mode 100644
index 9ca24ebf48..0000000000
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtilsTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.query.runtime.plan.serde;
-
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class QueryPlanSerDeUtilsTest {
-
- @Test
- public void shouldSerializeServer() {
- // Given:
- VirtualServerAddress server = Mockito.mock(VirtualServerAddress.class);
- Mockito.when(server.workerId()).thenReturn(1);
- Mockito.when(server.hostname()).thenReturn("Server_192.987.1.123");
- Mockito.when(server.port()).thenReturn(80);
-
- // When:
- String serialized = QueryPlanSerDeUtils.addressToProto(server);
-
- // Then:
- Assert.assertEquals(serialized, "1@Server_192.987.1.123:80");
- }
-
- @Test
- public void shouldDeserializeServerString() {
- // Given:
- String serverString = "1@Server_192.987.1.123:80";
-
- // When:
- VirtualServerAddress server = QueryPlanSerDeUtils.protoToAddress(serverString);
-
- // Then:
- Assert.assertEquals(server.workerId(), 1);
- Assert.assertEquals(server.hostname(), "Server_192.987.1.123");
- Assert.assertEquals(server.port(), 80);
- }
-}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 33b68f807e..f1315aa1bf 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -58,9 +58,9 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.StagePlan;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -154,29 +154,24 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
protected List<CompletableFuture<?>> processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan,
int stageId, Map<String, String> requestMetadataMap) {
- Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
- dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap();
+ DispatchablePlanFragment dispatchableStagePlan = dispatchableSubPlan.getQueryStageList().get(stageId);
+ List<WorkerMetadata> stageWorkerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
- for (Map.Entry<QueryServerInstance, List<Integer>> entry : serverInstanceToWorkerIdMap.entrySet()) {
- QueryServerInstance server = entry.getKey();
- for (int workerId : entry.getValue()) {
- DistributedStagePlan distributedStagePlan =
- constructDistributedStagePlan(dispatchableSubPlan, stageId, new VirtualServerAddress(server, workerId));
- submissionStubs.add(_servers.get(server).processQuery(distributedStagePlan, requestMetadataMap));
+ for (Map.Entry<QueryServerInstance, List<Integer>> entry : dispatchableStagePlan.getServerInstanceToWorkerIdMap()
+ .entrySet()) {
+ QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey());
+ List<WorkerMetadata> workerMetadataList =
+ entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList());
+ StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
+ StagePlan stagePlan =
+ new StagePlan(stageId, dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata);
+ for (WorkerMetadata workerMetadata : workerMetadataList) {
+ submissionStubs.add(serverEnclosure.processQuery(workerMetadata, stagePlan, requestMetadataMap));
}
}
return submissionStubs;
}
- protected static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
- int stageId, VirtualServerAddress serverAddress) {
- return new DistributedStagePlan(stageId, serverAddress,
- dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(),
- new StageMetadata.Builder().setWorkerMetadataList(
- dispatchableSubPlan.getQueryStageList().get(stageId).getWorkerMetadataList())
- .addCustomProperties(dispatchableSubPlan.getQueryStageList().get(stageId).getCustomProperties()).build());
- }
-
protected List<Object[]> queryH2(String sql)
throws Exception {
int firstSemi = sql.indexOf(';');
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 4e5a003427..679f46c60c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.service.server;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -26,6 +27,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
@@ -49,12 +51,15 @@ import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.EqualityUtils;
import org.apache.pinot.util.TestUtils;
-import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertTrue;
+
public class QueryServerTest extends QueryTestSet {
private static final Random RANDOM_REQUEST_ID_GEN = new Random();
@@ -70,10 +75,9 @@ public class QueryServerTest extends QueryTestSet {
@BeforeClass
public void setUp()
throws Exception {
-
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
- QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
+ QueryRunner queryRunner = mock(QueryRunner.class);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
@@ -96,85 +100,81 @@ public class QueryServerTest extends QueryTestSet {
}
@Test
- public void testException() {
- DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery("SELECT * FROM a");
+ public void testException()
+ throws Exception {
+ DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a");
// only get one worker request out.
- Worker.QueryRequest queryRequest = getQueryRequest(dispatchableSubPlan, 1);
- QueryRunner mockRunner =
- _queryRunnerMap.get(Integer.parseInt(queryRequest.getMetadataOrThrow(KEY_OF_SERVER_INSTANCE_PORT)));
- Mockito.doThrow(new RuntimeException("foo")).when(mockRunner).processQuery(Mockito.any(), Mockito.anyMap());
+ Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+ Map<String, String> requestMetadata = QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+ QueryRunner mockRunner = _queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
+ doThrow(new RuntimeException("foo")).when(mockRunner).processQuery(any(), any(), any());
// submit the request for testing.
- Worker.QueryResponse resp = submitRequest(queryRequest);
+ Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
// reset the mock runner before assert.
- Mockito.reset(mockRunner);
+ reset(mockRunner);
// should contain error message pattern
String errorMessage = resp.getMetadataMap().get(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR);
- Assert.assertTrue(errorMessage.contains("foo"));
+ assertTrue(errorMessage.contains("foo"));
}
@Test(dataProvider = "testSql")
public void testWorkerAcceptsWorkerRequestCorrect(String sql)
throws Exception {
- DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
-
- for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
- if (stageId > 0) { // we do not test reduce stage.
- // only get one worker request out.
- Worker.QueryRequest queryRequest = getQueryRequest(dispatchableSubPlan, stageId);
-
- // submit the request for testing.
- Worker.QueryResponse resp = submitRequest(queryRequest);
- Assert.assertNotNull(resp.getMetadataMap().get(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK));
+ DispatchableSubPlan queryPlan = _queryEnvironment.planQuery(sql);
+ List<DispatchablePlanFragment> stagePlans = queryPlan.getQueryStageList();
+ int numStages = stagePlans.size();
+ // Ignore reduce stage (stage 0)
+ for (int stageId = 1; stageId < numStages; stageId++) {
+ // only get one worker request out.
+ Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, stageId);
+ Map<String, String> requestMetadata = QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
- DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
+ // submit the request for testing.
+ Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
+ assertTrue(resp.getMetadataMap().containsKey(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK));
- StageMetadata stageMetadata =
- new StageMetadata.Builder().setWorkerMetadataList(dispatchablePlanFragment.getWorkerMetadataList())
- .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
+ DispatchablePlanFragment dispatchableStagePlan = stagePlans.get(stageId);
+ List<WorkerMetadata> workerMetadataList = dispatchableStagePlan.getWorkerMetadataList();
+ StageMetadata stageMetadata = new StageMetadata(workerMetadataList, dispatchableStagePlan.getCustomProperties());
- // ensure mock query runner received correctly deserialized payload.
- QueryRunner mockRunner =
- _queryRunnerMap.get(Integer.parseInt(queryRequest.getMetadataOrThrow(KEY_OF_SERVER_INSTANCE_PORT)));
- String requestIdStr = queryRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID);
+ // ensure mock query runner received correctly deserialized payload.
+ QueryRunner mockRunner = _queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
+ String requestId = requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID);
- // since submitRequest is async, we need to wait for the mockRunner to receive the query payload.
- int finalStageId = stageId;
- TestUtils.waitForCondition(aVoid -> {
- try {
- Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> {
- PlanNode planNode =
- dispatchableSubPlan.getQueryStageList().get(finalStageId).getPlanFragment().getFragmentRoot();
- return isStageNodesEqual(planNode, distributedStagePlan.getStageRoot()) && isStageMetadataEqual(
- stageMetadata, distributedStagePlan.getStageMetadata());
- }), Mockito.argThat(requestMetadataMap -> requestIdStr.equals(
- requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID))));
- return true;
- } catch (Throwable t) {
- return false;
- }
- }, 10000L, "Error verifying mock QueryRunner intercepted query payload!");
+ // since submitRequest is async, we need to wait for the mockRunner to receive the query payload.
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ verify(mockRunner, times(workerMetadataList.size())).processQuery(any(), argThat(stagePlan -> {
+ PlanNode planNode = dispatchableStagePlan.getPlanFragment().getFragmentRoot();
+ return isStageNodesEqual(planNode, stagePlan.getRootNode()) && isStageMetadataEqual(stageMetadata,
+ stagePlan.getStageMetadata());
+ }), argThat(requestMetadataMap -> requestId.equals(
+ requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID))));
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }, 10000L, "Error verifying mock QueryRunner intercepted query payload!");
- // reset the mock runner.
- Mockito.reset(mockRunner);
- }
+ // reset the mock runner.
+ reset(mockRunner);
}
}
private boolean isStageMetadataEqual(StageMetadata expected, StageMetadata actual) {
- if (!EqualityUtils.isEqual(StageMetadata.getTableName(expected), StageMetadata.getTableName(actual))) {
- return false;
- }
- TimeBoundaryInfo expectedTimeBoundaryInfo = StageMetadata.getTimeBoundary(expected);
- TimeBoundaryInfo actualTimeBoundaryInfo = StageMetadata.getTimeBoundary(actual);
- if (expectedTimeBoundaryInfo == null && actualTimeBoundaryInfo != null
- || expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo == null) {
+ if (!Objects.equals(expected.getTableName(), actual.getTableName())) {
return false;
}
- if (expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo != null && (
- !EqualityUtils.isEqual(expectedTimeBoundaryInfo.getTimeColumn(), actualTimeBoundaryInfo.getTimeColumn())
- || !EqualityUtils.isEqual(expectedTimeBoundaryInfo.getTimeValue(),
- actualTimeBoundaryInfo.getTimeValue()))) {
- return false;
+ TimeBoundaryInfo expectedTimeBoundaryInfo = expected.getTimeBoundary();
+ TimeBoundaryInfo actualTimeBoundaryInfo = actual.getTimeBoundary();
+ if (expectedTimeBoundaryInfo != null || actualTimeBoundaryInfo != null) {
+ if (expectedTimeBoundaryInfo == null || actualTimeBoundaryInfo == null) {
+ return false;
+ }
+ if (!expectedTimeBoundaryInfo.getTimeColumn().equals(actualTimeBoundaryInfo.getTimeColumn())
+ || !expectedTimeBoundaryInfo.getTimeValue().equals(actualTimeBoundaryInfo.getTimeValue())) {
+ return false;
+ }
}
List<WorkerMetadata> expectedWorkerMetadataList = expected.getWorkerMetadataList();
List<WorkerMetadata> actualWorkerMetadataList = actual.getWorkerMetadataList();
@@ -190,13 +190,8 @@ public class QueryServerTest extends QueryTestSet {
}
private static boolean isWorkerMetadataEqual(WorkerMetadata expected, WorkerMetadata actual) {
- if (!expected.getVirtualServerAddress().hostname().equals(actual.getVirtualServerAddress().hostname())
- || expected.getVirtualServerAddress().port() != actual.getVirtualServerAddress().port()
- || expected.getVirtualServerAddress().workerId() != actual.getVirtualServerAddress().workerId()) {
- return false;
- }
- return EqualityUtils.isEqual(WorkerMetadata.getTableSegmentsMap(expected),
- WorkerMetadata.getTableSegmentsMap(actual));
+ return expected.getVirtualAddress().equals(actual.getVirtualAddress()) && EqualityUtils.isEqual(
+ expected.getTableSegmentsMap(), actual.getTableSegmentsMap());
}
private static boolean isStageNodesEqual(PlanNode left, PlanNode right) {
@@ -216,11 +211,10 @@ public class QueryServerTest extends QueryTestSet {
return true;
}
- private Worker.QueryResponse submitRequest(Worker.QueryRequest queryRequest) {
- String host = queryRequest.getMetadataMap().get(KEY_OF_SERVER_INSTANCE_HOST);
- int port = Integer.parseInt(queryRequest.getMetadataMap().get(KEY_OF_SERVER_INSTANCE_PORT));
- long timeoutMs =
- Long.parseLong(queryRequest.getMetadataMap().get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+ private Worker.QueryResponse submitRequest(Worker.QueryRequest queryRequest, Map<String, String> requestMetadata) {
+ String host = requestMetadata.get(KEY_OF_SERVER_INSTANCE_HOST);
+ int port = Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT));
+ long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub stub = PinotQueryWorkerGrpc.newBlockingStub(channel);
Worker.QueryResponse resp =
@@ -229,32 +223,35 @@ public class QueryServerTest extends QueryTestSet {
return resp;
}
- private Worker.QueryRequest getQueryRequest(DispatchableSubPlan dispatchableSubPlan, int stageId) {
- DispatchablePlanFragment planFragment = dispatchableSubPlan.getQueryStageList().get(stageId);
- Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = planFragment.getServerInstanceToWorkerIdMap();
+ private Worker.QueryRequest getQueryRequest(DispatchableSubPlan queryPlan, int stageId) {
+ DispatchablePlanFragment stagePlan = queryPlan.getQueryStageList().get(stageId);
+ Plan.StageNode rootNode =
+ StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) stagePlan.getPlanFragment().getFragmentRoot());
+ List<Worker.WorkerMetadata> workerMetadataList =
+ QueryPlanSerDeUtils.toProtoWorkerMetadataList(stagePlan.getWorkerMetadataList());
+ ByteString customProperty = QueryPlanSerDeUtils.toProtoProperties(stagePlan.getCustomProperties());
+
// this particular test set requires the request to have a single QueryServerInstance to dispatch to
// as it is not testing the multi-tenancy dispatch (which is in the QueryDispatcherTest)
- Map.Entry<QueryServerInstance, List<Integer>> entry = serverInstanceToWorkerIdMap.entrySet().iterator().next();
- QueryServerInstance serverInstance = entry.getKey();
- List<Integer> workerIds = entry.getValue();
- Plan.StageNode stageRoot =
- StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) planFragment.getPlanFragment().getFragmentRoot());
- List<Worker.WorkerMetadata> protoWorkerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(planFragment);
+ QueryServerInstance serverInstance = stagePlan.getServerInstanceToWorkerIdMap().keySet().iterator().next();
Worker.StageMetadata stageMetadata =
- QueryPlanSerDeUtils.toProtoStageMetadata(protoWorkerMetadataList, planFragment.getCustomProperties(),
- serverInstance, workerIds);
- Worker.StagePlan stagePlan =
- Worker.StagePlan.newBuilder().setStageId(stageId).setStageRoot(stageRoot).setStageMetadata(stageMetadata)
+ Worker.StageMetadata.newBuilder().addAllWorkerMetadata(workerMetadataList).setCustomProperty(customProperty)
.build();
+ Worker.StagePlan protoStagePlan =
+ Worker.StagePlan.newBuilder().setStageId(stageId).setRootNode(rootNode.toByteString())
+ .setStageMetadata(stageMetadata).build();
+
+ Map<String, String> requestMetadata = new HashMap<>();
+ // the default configurations that must exist.
+ requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
+ String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+ requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+ String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+ // extra configurations we want to test also parsed out correctly.
+ requestMetadata.put(KEY_OF_SERVER_INSTANCE_HOST, serverInstance.getHostname());
+ requestMetadata.put(KEY_OF_SERVER_INSTANCE_PORT, Integer.toString(serverInstance.getQueryServicePort()));
- return Worker.QueryRequest.newBuilder().addStagePlan(stagePlan)
- // the default configurations that must exist.
- .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID,
- String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()))
- .putMetadata(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
- String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS))
- // extra configurations we want to test also parsed out correctly.
- .putMetadata(KEY_OF_SERVER_INSTANCE_HOST, serverInstance.getHostname())
- .putMetadata(KEY_OF_SERVER_INSTANCE_PORT, String.valueOf(serverInstance.getQueryServicePort())).build();
+ return Worker.QueryRequest.newBuilder().addStagePlan(protoStagePlan)
+ .setMetadata(QueryPlanSerDeUtils.toProtoProperties(requestMetadata)).build();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org