You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/05/07 06:58:04 UTC
[pinot] branch master updated: Refactor all StageNodes to PlanNodes during query planning phase (#10735)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7c3dd06a9b Refactor all StageNodes to PlanNodes during query planning phase (#10735)
7c3dd06a9b is described below
commit 7c3dd06a9b0966551b84512af97259c6c5c35537
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Sat May 6 23:57:58 2023 -0700
Refactor all StageNodes to PlanNodes during query planning phase (#10735)
---
.../PinotAggregateExchangeNodeInsertRule.java | 7 +-
...ageVisitor.java => ExplainPlanPlanVisitor.java} | 50 +++++++-------
.../apache/pinot/query/planner/PlannerUtils.java | 8 +--
.../org/apache/pinot/query/planner/QueryPlan.java | 40 ++++++------
.../query/planner/logical/RelToStageConverter.java | 46 ++++++-------
.../planner/logical/ShuffleRewriteVisitor.java | 38 +++++------
.../query/planner/logical/StageFragmenter.java | 76 +++++++++++-----------
.../pinot/query/planner/logical/StagePlanner.java | 27 ++++----
.../planner/physical/DispatchablePlanContext.java | 6 +-
.../planner/physical/DispatchablePlanMetadata.java | 9 +--
.../planner/physical/DispatchablePlanVisitor.java | 57 ++++++++--------
.../planner/physical/MailboxAssignmentVisitor.java | 29 +++++----
.../colocated/GreedyShuffleRewriteContext.java | 76 +++++++++++-----------
.../GreedyShuffleRewritePreComputeVisitor.java | 38 +++++------
.../colocated/GreedyShuffleRewriteVisitor.java | 70 ++++++++++----------
.../AbstractPlanNode.java} | 30 ++++-----
.../planner/{stage => plannode}/AggregateNode.java | 15 +++--
.../DefaultPostOrderTraversalVisitor.java | 6 +-
.../planner/{stage => plannode}/ExchangeNode.java | 10 +--
.../planner/{stage => plannode}/FilterNode.java | 10 +--
.../planner/{stage => plannode}/JoinNode.java | 14 ++--
.../{stage => plannode}/MailboxReceiveNode.java | 20 +++---
.../{stage => plannode}/MailboxSendNode.java | 14 ++--
.../StageNode.java => plannode/PlanNode.java} | 20 +++---
.../PlanNodeVisitor.java} | 15 +++--
.../planner/{stage => plannode}/ProjectNode.java | 10 +--
.../planner/{stage => plannode}/SetOpNode.java | 14 ++--
.../planner/{stage => plannode}/SortNode.java | 15 +++--
.../{stage => plannode}/StageNodeSerDeUtils.java | 52 +++++++--------
.../planner/{stage => plannode}/TableScanNode.java | 14 ++--
.../planner/{stage => plannode}/ValueNode.java | 10 +--
.../planner/{stage => plannode}/WindowNode.java | 15 +++--
.../pinot/query/planner/serde/ProtoProperties.java | 3 +-
...tageMetadata.java => PlanFragmentMetadata.java} | 14 ++--
.../apache/pinot/query/routing/WorkerManager.java | 6 +-
.../apache/pinot/query/routing/WorkerMetadata.java | 4 +-
.../apache/pinot/query/QueryCompilationTest.java | 68 ++++++++++---------
.../{stage => plannode}/SerDeUtilsTest.java | 16 ++---
.../apache/pinot/query/runtime/QueryRunner.java | 20 +++---
.../query/runtime/operator/HashJoinOperator.java | 2 +-
.../runtime/operator/WindowAggregateOperator.java | 2 +-
.../runtime/operator/utils/OperatorUtils.java | 8 +--
.../query/runtime/plan/DistributedStagePlan.java | 28 ++++----
.../runtime/plan/OpChainExecutionContext.java | 12 ++--
.../query/runtime/plan/PhysicalPlanVisitor.java | 42 ++++++------
.../query/runtime/plan/PlanRequestContext.java | 12 ++--
.../runtime/plan/ServerRequestPlanVisitor.java | 36 +++++-----
.../runtime/plan/serde/QueryPlanSerDeUtils.java | 18 ++---
.../plan/server/ServerPlanRequestContext.java | 6 +-
.../query/service/dispatch/QueryDispatcher.java | 12 ++--
.../pinot/query/runtime/QueryRunnerTest.java | 2 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 2 +-
.../runtime/operator/HashJoinOperatorTest.java | 2 +-
.../operator/MailboxReceiveOperatorTest.java | 34 +++++-----
.../runtime/operator/MailboxSendOperatorTest.java | 6 +-
.../pinot/query/runtime/operator/OpChainTest.java | 14 ++--
.../operator/SortedMailboxReceiveOperatorTest.java | 38 +++++------
.../operator/WindowAggregateOperatorTest.java | 2 +-
.../pinot/query/service/QueryServerTest.java | 33 +++++-----
.../service/dispatch/QueryDispatcherTest.java | 2 +-
60 files changed, 663 insertions(+), 642 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 99736870bf..6af2c983fb 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -51,7 +51,7 @@ import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
/**
@@ -95,8 +95,9 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
}
/**
- * Split the AGG into 2 stages, both with the same AGG type,
- * Pinot internal stage optimization can use the info of the input data type to infer whether it should generate
+ * Split the AGG into 2 plan fragments, both with the same AGG type,
+ * Pinot internal plan fragment optimization can use the info of the input data type to infer whether it should
+ * generate
* the "intermediate-stage AGG operator" or a "leaf-stage AGG operator"
* @see org.apache.pinot.core.query.aggregation.function.AggregationFunction
*
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
index e639701256..7b5e935010 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
@@ -23,20 +23,20 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.routing.QueryServerInstance;
@@ -46,7 +46,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
* <p>It is currently not used programmatically and cannot be accessed by the user. Instead,
* it is intended for use in manual debugging (e.g. setting breakpoints and calling QueryPlan#explain()).
*/
-public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder, ExplainPlanStageVisitor.Context> {
+public class ExplainPlanPlanVisitor implements PlanNodeVisitor<StringBuilder, ExplainPlanPlanVisitor.Context> {
private final QueryPlan _queryPlan;
@@ -80,23 +80,23 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
*
* @return a query plan associated with
*/
- public static String explainFrom(QueryPlan queryPlan, StageNode node, QueryServerInstance rootServer) {
- final ExplainPlanStageVisitor visitor = new ExplainPlanStageVisitor(queryPlan);
+ public static String explainFrom(QueryPlan queryPlan, PlanNode node, QueryServerInstance rootServer) {
+ final ExplainPlanPlanVisitor visitor = new ExplainPlanPlanVisitor(queryPlan);
return node
.visit(visitor, new Context(rootServer, 0, "", "", new StringBuilder()))
.toString();
}
- private ExplainPlanStageVisitor(QueryPlan queryPlan) {
+ private ExplainPlanPlanVisitor(QueryPlan queryPlan) {
_queryPlan = queryPlan;
}
- private StringBuilder appendInfo(StageNode node, Context context) {
- int stage = node.getStageId();
+ private StringBuilder appendInfo(PlanNode node, Context context) {
+ int planFragmentId = node.getPlanFragmentId();
context._builder
.append(context._prefix)
.append('[')
- .append(stage)
+ .append(planFragmentId)
.append("]@")
.append(context._host.getHostname())
.append(':')
@@ -106,7 +106,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
return context._builder;
}
- private StringBuilder visitSimpleNode(StageNode node, Context context) {
+ private StringBuilder visitSimpleNode(PlanNode node, Context context) {
appendInfo(node, context).append('\n');
return node.getInputs().get(0).visit(this, context.next(false, context._host, context._workerId));
}
@@ -124,7 +124,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
@Override
public StringBuilder visitSetOp(SetOpNode setOpNode, Context context) {
appendInfo(setOpNode, context).append('\n');
- for (StageNode input : setOpNode.getInputs()) {
+ for (PlanNode input : setOpNode.getInputs()) {
input.visit(this, context.next(false, context._host, context._workerId));
}
return context._builder;
@@ -195,7 +195,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
.getServerInstanceToWorkerIdMap();
context._builder.append("->");
String receivers = servers.entrySet().stream()
- .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.map(s -> "[" + receiverStageId + "]@" + s)
.collect(Collectors.joining(",", "{", "}"));
return context._builder.append(receivers);
@@ -216,7 +216,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
return appendInfo(node, context)
.append(' ')
.append(_queryPlan.getDispatchablePlanMetadataMap()
- .get(node.getStageId())
+ .get(node.getPlanFragmentId())
.getWorkerIdToSegmentsMap()
.get(context._host))
.append('\n');
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
index c3ce9fc116..ad92eab6e8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
@@ -36,12 +36,12 @@ public class PlannerUtils {
// do not instantiate.
}
- public static boolean isRootStage(int stageId) {
- return stageId == 0;
+ public static boolean isRootPlanFragment(int planFragmentId) {
+ return planFragmentId == 0;
}
- public static boolean isFinalStage(int stageId) {
- return stageId == 1;
+ public static boolean isFinalPlanFragment(int planFragmentId) {
+ return planFragmentId == 1;
}
public static String explainPlan(RelNode relRoot, SqlExplainFormat format, SqlExplainLevel explainLevel) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index e21e1923ec..25f410cf84 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -23,10 +23,10 @@ import java.util.List;
import java.util.Map;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -45,32 +45,32 @@ import org.apache.pinot.query.routing.WorkerMetadata;
*/
public class QueryPlan {
private final List<Pair<Integer, String>> _queryResultFields;
- private final Map<Integer, StageNode> _queryStageMap;
- private final List<StageMetadata> _stageMetadataList;
+ private final Map<Integer, PlanNode> _queryStageMap;
+ private final List<PlanFragmentMetadata> _planFragmentMetadataList;
private final Map<Integer, DispatchablePlanMetadata> _dispatchablePlanMetadataMap;
- public QueryPlan(List<Pair<Integer, String>> fields, Map<Integer, StageNode> queryStageMap,
+ public QueryPlan(List<Pair<Integer, String>> fields, Map<Integer, PlanNode> queryStageMap,
Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
_queryResultFields = fields;
_queryStageMap = queryStageMap;
_dispatchablePlanMetadataMap = dispatchablePlanMetadataMap;
- _stageMetadataList = constructStageMetadataList(_dispatchablePlanMetadataMap);
+ _planFragmentMetadataList = constructStageMetadataList(_dispatchablePlanMetadataMap);
}
/**
* Get the map between stageID and the stage plan root node.
* @return stage plan map.
*/
- public Map<Integer, StageNode> getQueryStageMap() {
+ public Map<Integer, PlanNode> getQueryStageMap() {
return _queryStageMap;
}
/**
- * Get the stage metadata information based on stageId.
+ * Get the stage metadata information based on planFragmentId.
* @return stage metadata info.
*/
- public StageMetadata getStageMetadata(int stageId) {
- return _stageMetadataList.get(stageId);
+ public PlanFragmentMetadata getStageMetadata(int planFragmentId) {
+ return _planFragmentMetadataList.get(planFragmentId);
}
/**
@@ -93,21 +93,21 @@ public class QueryPlan {
* Explains the {@code QueryPlan}
*
* @return a human-readable tree explaining the query plan
- * @see ExplainPlanStageVisitor#explain(QueryPlan)
+ * @see ExplainPlanPlanVisitor#explain(QueryPlan)
* @apiNote this is <b>NOT</b> identical to the SQL {@code EXPLAIN PLAN FOR} functionality
* and is instead intended to be used by developers debugging during feature
* development
*/
public String explain() {
- return ExplainPlanStageVisitor.explain(this);
+ return ExplainPlanPlanVisitor.explain(this);
}
/**
* Convert the {@link DispatchablePlanMetadata} into dispatchable info for each stage/worker.
*/
- private static List<StageMetadata> constructStageMetadataList(
+ private static List<PlanFragmentMetadata> constructStageMetadataList(
Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
- StageMetadata[] stageMetadataList = new StageMetadata[dispatchablePlanMetadataMap.size()];
+ PlanFragmentMetadata[] planFragmentMetadataList = new PlanFragmentMetadata[dispatchablePlanMetadataMap.size()];
for (Map.Entry<Integer, DispatchablePlanMetadata> dispatchableEntry : dispatchablePlanMetadataMap.entrySet()) {
DispatchablePlanMetadata dispatchablePlanMetadata = dispatchableEntry.getValue();
@@ -119,9 +119,9 @@ public class QueryPlan {
VirtualServerAddress virtualServerAddress = new VirtualServerAddress(queryServerEntry.getKey(), workerId);
WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
builder.setVirtualServerAddress(virtualServerAddress);
- Map<Integer, MailboxMetadata> stageToMailboxMetadata =
+ Map<Integer, MailboxMetadata> planFragmentToMailboxMetadata =
dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap().get(workerId);
- builder.putAllMailBoxInfosMap(stageToMailboxMetadata);
+ builder.putAllMailBoxInfosMap(planFragmentToMailboxMetadata);
if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
builder.addTableSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap().get(workerId));
}
@@ -130,8 +130,8 @@ public class QueryPlan {
}
// construct the stageMetadata
- int stageId = dispatchableEntry.getKey();
- StageMetadata.Builder builder = new StageMetadata.Builder();
+ int planFragmentId = dispatchableEntry.getKey();
+ PlanFragmentMetadata.Builder builder = new PlanFragmentMetadata.Builder();
builder.setWorkerMetadataList(Arrays.asList(workerMetadataList));
if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
builder.addTableName(dispatchablePlanMetadata.getScannedTables().get(0));
@@ -139,8 +139,8 @@ public class QueryPlan {
if (dispatchablePlanMetadata.getTimeBoundaryInfo() != null) {
builder.addTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo());
}
- stageMetadataList[stageId] = builder.build();
+ planFragmentMetadataList[planFragmentId] = builder.build();
}
- return Arrays.asList(stageMetadataList);
+ return Arrays.asList(planFragmentMetadataList);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index ea0c46b390..3d353b2514 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -44,22 +44,22 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.spi.data.FieldSpec;
/**
- * The {@code StageNodeConverter} converts a logical {@link RelNode} to a {@link StageNode}.
+ * The {@code StageNodeConverter} converts a logical {@link RelNode} to a {@link PlanNode}.
*/
public final class RelToStageConverter {
@@ -75,7 +75,7 @@ public final class RelToStageConverter {
* @param node relational node
* @return stage node.
*/
- public static StageNode toStageNode(RelNode node, int currentStageId) {
+ public static PlanNode toStageNode(RelNode node, int currentStageId) {
if (node instanceof LogicalTableScan) {
return convertLogicalTableScan((LogicalTableScan) node, currentStageId);
} else if (node instanceof LogicalJoin) {
@@ -101,7 +101,7 @@ public final class RelToStageConverter {
}
}
- private static StageNode convertLogicalExchange(Exchange node, int currentStageId) {
+ private static PlanNode convertLogicalExchange(Exchange node, int currentStageId) {
RelCollation collation = null;
boolean isSortOnSender = false;
boolean isSortOnReceiver = false;
@@ -118,47 +118,47 @@ public final class RelToStageConverter {
isSortOnSender, isSortOnReceiver);
}
- private static StageNode convertLogicalSetOp(SetOp node, int currentStageId) {
+ private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
return new SetOpNode(SetOpNode.SetOpType.fromObject(node), currentStageId, toDataSchema(node.getRowType()),
node.all);
}
- private static StageNode convertLogicalValues(LogicalValues node, int currentStageId) {
+ private static PlanNode convertLogicalValues(LogicalValues node, int currentStageId) {
return new ValueNode(currentStageId, toDataSchema(node.getRowType()), node.tuples);
}
- private static StageNode convertLogicalWindow(LogicalWindow node, int currentStageId) {
+ private static PlanNode convertLogicalWindow(LogicalWindow node, int currentStageId) {
return new WindowNode(currentStageId, node.groups, node.constants, toDataSchema(node.getRowType()));
}
- private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
+ private static PlanNode convertLogicalSort(LogicalSort node, int currentStageId) {
int fetch = RexExpressionUtils.getValueAsInt(node.fetch);
int offset = RexExpressionUtils.getValueAsInt(node.offset);
return new SortNode(currentStageId, node.getCollation().getFieldCollations(), fetch, offset,
toDataSchema(node.getRowType()));
}
- private static StageNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) {
+ private static PlanNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) {
return new AggregateNode(currentStageId, toDataSchema(node.getRowType()), node.getAggCallList(),
RexExpression.toRexInputRefs(node.getGroupSet()), node.getHints());
}
- private static StageNode convertLogicalProject(LogicalProject node, int currentStageId) {
+ private static PlanNode convertLogicalProject(LogicalProject node, int currentStageId) {
return new ProjectNode(currentStageId, toDataSchema(node.getRowType()), node.getProjects());
}
- private static StageNode convertLogicalFilter(LogicalFilter node, int currentStageId) {
+ private static PlanNode convertLogicalFilter(LogicalFilter node, int currentStageId) {
return new FilterNode(currentStageId, toDataSchema(node.getRowType()), node.getCondition());
}
- private static StageNode convertLogicalTableScan(LogicalTableScan node, int currentStageId) {
+ private static PlanNode convertLogicalTableScan(LogicalTableScan node, int currentStageId) {
String tableName = node.getTable().getQualifiedName().get(0);
List<String> columnNames =
node.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList());
return new TableScanNode(currentStageId, toDataSchema(node.getRowType()), tableName, columnNames);
}
- private static StageNode convertLogicalJoin(LogicalJoin node, int currentStageId) {
+ private static PlanNode convertLogicalJoin(LogicalJoin node, int currentStageId) {
JoinRelType joinType = node.getJoinType();
// Parse out all equality JOIN conditions
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
index 664ddd44ff..54d03da712 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -26,20 +26,20 @@ import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
/**
@@ -48,10 +48,10 @@ import org.apache.pinot.query.planner.stage.WindowNode;
* a single host. It gathers the information recursively by checking which partitioned
* data is selected by each node in the tree.
*
- * <p>The only method that should be used externally is {@link #optimizeShuffles(StageNode)},
- * other public methods are used only by {@link StageNode#visit(StageNodeVisitor, Object)}.
+ * <p>The only method that should be used externally is {@link #optimizeShuffles(PlanNode)},
+ * other public methods are used only by {@link PlanNode#visit(PlanNodeVisitor, Object)}.
*/
-public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, Void> {
+public class ShuffleRewriteVisitor implements PlanNodeVisitor<Set<Integer>, Void> {
/**
* This method rewrites {@code root} <b>in place</b>, removing any unnecessary shuffles
@@ -59,12 +59,12 @@ public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, Voi
*
* @param root the root node of the tree to rewrite
*/
- public static void optimizeShuffles(StageNode root) {
+ public static void optimizeShuffles(PlanNode root) {
root.visit(new ShuffleRewriteVisitor(), null);
}
/**
- * Access to this class should only be used via {@link #optimizeShuffles(StageNode)}
+ * Access to this class should only be used via {@link #optimizeShuffles(PlanNode)}
*/
private ShuffleRewriteVisitor() {
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
index e33638d824..d04443f04f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
@@ -22,96 +22,96 @@ import java.util.List;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
-
-
-public class StageFragmenter implements StageNodeVisitor<StageNode, StageFragmenter.Context> {
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+
+
+public class StageFragmenter implements PlanNodeVisitor<PlanNode, StageFragmenter.Context> {
public static final StageFragmenter INSTANCE = new StageFragmenter();
- private StageNode process(StageNode node, Context context) {
- node.setStageId(context._currentStageId);
- List<StageNode> inputs = node.getInputs();
+ private PlanNode process(PlanNode node, Context context) {
+ node.setPlanFragmentId(context._currentStageId);
+ List<PlanNode> inputs = node.getInputs();
for (int i = 0; i < inputs.size(); i++) {
- context._previousStageId = node.getStageId();
+ context._previousStageId = node.getPlanFragmentId();
inputs.set(i, inputs.get(i).visit(this, context));
}
return node;
}
@Override
- public StageNode visitAggregate(AggregateNode node, Context context) {
+ public PlanNode visitAggregate(AggregateNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitFilter(FilterNode node, Context context) {
+ public PlanNode visitFilter(FilterNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitJoin(JoinNode node, Context context) {
+ public PlanNode visitJoin(JoinNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitMailboxReceive(MailboxReceiveNode node, Context context) {
+ public PlanNode visitMailboxReceive(MailboxReceiveNode node, Context context) {
throw new UnsupportedOperationException("MailboxReceiveNode should not be visited by StageFragmenter");
}
@Override
- public StageNode visitMailboxSend(MailboxSendNode node, Context context) {
+ public PlanNode visitMailboxSend(MailboxSendNode node, Context context) {
throw new UnsupportedOperationException("MailboxSendNode should not be visited by StageFragmenter");
}
@Override
- public StageNode visitProject(ProjectNode node, Context context) {
+ public PlanNode visitProject(ProjectNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitSort(SortNode node, Context context) {
+ public PlanNode visitSort(SortNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitTableScan(TableScanNode node, Context context) {
+ public PlanNode visitTableScan(TableScanNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitValue(ValueNode node, Context context) {
+ public PlanNode visitValue(ValueNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitWindow(WindowNode node, Context context) {
+ public PlanNode visitWindow(WindowNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitSetOp(SetOpNode node, Context context) {
+ public PlanNode visitSetOp(SetOpNode node, Context context) {
return process(node, context);
}
@Override
- public StageNode visitExchange(ExchangeNode node, Context context) {
+ public PlanNode visitExchange(ExchangeNode node, Context context) {
int nodeStageId = context._previousStageId;
context._currentStageId++;
- StageNode nextStageRoot = node.getInputs().get(0).visit(this, context);
+ PlanNode nextStageRoot = node.getInputs().get(0).visit(this, context);
List<Integer> distributionKeys = node.getDistributionKeys();
RelDistribution.Type exchangeType = node.getDistributionType();
@@ -122,10 +122,10 @@ public class StageFragmenter implements StageNodeVisitor<StageNode, StageFragmen
KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
? new FieldSelectionKeySelector(distributionKeys) : null;
- StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
+ PlanNode mailboxSender = new MailboxSendNode(nextStageRoot.getPlanFragmentId(), nextStageRoot.getDataSchema(),
nodeStageId, exchangeType, keySelector, node.getCollations(), node.isSortOnSender());
- StageNode mailboxReceiver = new MailboxReceiveNode(nodeStageId, nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), exchangeType, keySelector,
+ PlanNode mailboxReceiver = new MailboxReceiveNode(nodeStageId, nextStageRoot.getDataSchema(),
+ nextStageRoot.getPlanFragmentId(), exchangeType, keySelector,
node.getCollations(), node.isSortOnSender(), node.isSortOnReceiver(), mailboxSender);
mailboxSender.addInput(nextStageRoot);
@@ -134,7 +134,7 @@ public class StageFragmenter implements StageNodeVisitor<StageNode, StageFragmen
public static class Context {
- // Stage ID starts with 1, 0 will be reserved for ROOT stage.
+ // Stage ID starts with 1, 0 will be reserved for ROOT PlanFragment.
Integer _currentStageId = 1;
Integer _previousStageId = 1;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 91e1ba7a89..5ca7de8543 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -29,14 +29,14 @@ import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
import org.apache.pinot.query.planner.physical.DispatchablePlanVisitor;
import org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.WorkerManager;
/**
- * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link StageNode}.
+ * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link PlanNode}.
*
* This class is non-threadsafe. Do not reuse the stage planner for multiple query plans.
*/
@@ -64,19 +64,20 @@ public class StagePlanner {
RelNode relRootNode = relRoot.rel;
// Walk through RelNode tree and construct a StageNode tree.
- StageNode globalStageRoot = relNodeToStageNode(relRootNode);
+ PlanNode globalStageRoot = relNodeToStageNode(relRootNode);
// Fragment the stage tree into multiple stages.
globalStageRoot = globalStageRoot.visit(StageFragmenter.INSTANCE, new StageFragmenter.Context());
// global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
- StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
- 0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
+ PlanNode globalSenderNode =
+ new MailboxSendNode(globalStageRoot.getPlanFragmentId(), globalStageRoot.getDataSchema(),
+ 0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
globalSenderNode.addInput(globalStageRoot);
- StageNode globalReceiverNode =
- new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
+ PlanNode globalReceiverNode =
+ new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getPlanFragmentId(),
RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, globalSenderNode);
// perform physical plan conversion and assign workers to each stage.
@@ -93,13 +94,13 @@ public class StagePlanner {
// non-threadsafe
// TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
- private StageNode relNodeToStageNode(RelNode node) {
- StageNode stageNode = RelToStageConverter.toStageNode(node, -1);
+ private PlanNode relNodeToStageNode(RelNode node) {
+ PlanNode planNode = RelToStageConverter.toStageNode(node, -1);
List<RelNode> inputs = node.getInputs();
for (RelNode input : inputs) {
- stageNode.addInput(relNodeToStageNode(input));
+ planNode.addInput(relNodeToStageNode(input));
}
- return stageNode;
+ return planNode;
}
// TODO: Switch to Worker SPI to avoid multiple-places where workers are assigned.
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 227aa5429b..aa587cf974 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.context.PlannerContext;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.WorkerManager;
@@ -37,7 +37,7 @@ public class DispatchablePlanContext {
private final PlannerContext _plannerContext;
private final Map<Integer, DispatchablePlanMetadata> _dispatchablePlanMetadataMap;
- private final Map<Integer, StageNode> _dispatchablePlanStageRootMap;
+ private final Map<Integer, PlanNode> _dispatchablePlanStageRootMap;
public DispatchablePlanContext(WorkerManager workerManager, long requestId, PlannerContext plannerContext,
List<Pair<Integer, String>> resultFields, Set<String> tableNames) {
@@ -75,7 +75,7 @@ public class DispatchablePlanContext {
return _dispatchablePlanMetadataMap;
}
- public Map<Integer, StageNode> getDispatchablePlanStageRootMap() {
+ public Map<Integer, PlanNode> getDispatchablePlanStageRootMap() {
return _dispatchablePlanStageRootMap;
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 7575c4dc8e..36d677d362 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -29,7 +29,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
/**
- * The {@code StageMetadata} info contains the information for dispatching a particular stage.
+ * The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
*
* <p>It contains information aboute:
* <ul>
@@ -50,7 +50,7 @@ public class DispatchablePlanMetadata implements Serializable {
private Map<Integer, Map<String, List<String>>> _workerIdToSegmentsMap;
// used for build mailboxes between workers.
- // workerId -> {stageId -> mailbox list}
+ // workerId -> {planFragmentId -> mailbox list}
private Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
// time boundary info
@@ -100,8 +100,9 @@ public class DispatchablePlanMetadata implements Serializable {
_workerIdToMailboxesMap.putAll(workerIdToMailboxesMap);
}
- public void addWorkerIdToMailBoxIdsMap(int stageId, Map<Integer, MailboxMetadata> stageIdToMailboxesMap) {
- _workerIdToMailboxesMap.put(stageId, stageIdToMailboxesMap);
+ public void addWorkerIdToMailBoxIdsMap(int planFragmentId,
+ Map<Integer, MailboxMetadata> planFragmentIdToMailboxesMap) {
+ _workerIdToMailboxesMap.put(planFragmentId, planFragmentIdToMailboxesMap);
}
public Map<QueryServerInstance, List<Integer>> getServerInstanceToWorkerIdMap() {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index 5258fc5637..964d332127 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -19,23 +19,23 @@
package org.apache.pinot.query.planner.physical;
import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
-
-
-public class DispatchablePlanVisitor implements StageNodeVisitor<Void, DispatchablePlanContext> {
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+
+
+public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, DispatchablePlanContext> {
public static final DispatchablePlanVisitor INSTANCE = new DispatchablePlanVisitor();
private DispatchablePlanVisitor() {
@@ -43,17 +43,17 @@ public class DispatchablePlanVisitor implements StageNodeVisitor<Void, Dispatcha
/**
* Entry point for attaching dispatch metadata to a query plan. It walks through the plan via the global
- * {@link StageNode} root of the query and:
+ * {@link PlanNode} root of the query and:
* <ul>
- * <li>break down the {@link StageNode}s into Stages that can run on a single worker.</li>
- * <li>each stage is represented by a subset of {@link StageNode}s without data exchange.</li>
+ * <li>break down the {@link PlanNode}s into Stages that can run on a single worker.</li>
+ * <li>each stage is represented by a subset of {@link PlanNode}s without data exchange.</li>
* <li>attach worker execution information including physical server address, worker ID to each stage.</li>
* </ul>
*
* @param globalReceiverNode the entrypoint of the stage plan.
* @param dispatchablePlanContext dispatchable plan context used to record the walk of the stage node tree.
*/
- public QueryPlan constructDispatchablePlan(StageNode globalReceiverNode,
+ public QueryPlan constructDispatchablePlan(PlanNode globalReceiverNode,
DispatchablePlanContext dispatchablePlanContext) {
// 1. start by visiting the stage root.
globalReceiverNode.visit(DispatchablePlanVisitor.INSTANCE, dispatchablePlanContext);
@@ -80,16 +80,17 @@ public class DispatchablePlanVisitor implements StageNodeVisitor<Void, Dispatcha
dispatchablePlanContext.getDispatchablePlanMetadataMap());
}
- private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(StageNode node,
+ private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(PlanNode node,
DispatchablePlanContext context) {
- return context.getDispatchablePlanMetadataMap().computeIfAbsent(node.getStageId(),
+ return context.getDispatchablePlanMetadataMap().computeIfAbsent(node.getPlanFragmentId(),
(id) -> new DispatchablePlanMetadata());
}
- private static void computeWorkerAssignment(StageNode node, DispatchablePlanContext context) {
- int stageId = node.getStageId();
- context.getWorkerManager().assignWorkerToStage(stageId, context.getDispatchablePlanMetadataMap().get(stageId),
- context.getRequestId(), context.getPlannerContext().getOptions(), context.getTableNames());
+ private static void computeWorkerAssignment(PlanNode node, DispatchablePlanContext context) {
+ int planFragmentId = node.getPlanFragmentId();
+ context.getWorkerManager()
+ .assignWorkerToStage(planFragmentId, context.getDispatchablePlanMetadataMap().get(planFragmentId),
+ context.getRequestId(), context.getPlannerContext().getOptions(), context.getTableNames());
}
@Override
@@ -150,7 +151,7 @@ public class DispatchablePlanVisitor implements StageNodeVisitor<Void, Dispatcha
node.getInputs().get(0).visit(this, context);
getOrCreateDispatchablePlanMetadata(node, context);
- context.getDispatchablePlanStageRootMap().put(node.getStageId(), node);
+ context.getDispatchablePlanStageRootMap().put(node.getPlanFragmentId(), node);
computeWorkerAssignment(node, context);
return null;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index c41423cc04..5b3040fd35 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -21,10 +21,10 @@ package org.apache.pinot.query.planner.physical;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.query.planner.stage.DefaultPostOrderTraversalVisitor;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -34,12 +34,12 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
public static final MailboxAssignmentVisitor INSTANCE = new MailboxAssignmentVisitor();
@Override
- public Void process(StageNode node, DispatchablePlanContext context) {
+ public Void process(PlanNode node, DispatchablePlanContext context) {
if (node instanceof MailboxSendNode || node instanceof MailboxReceiveNode) {
int receiverStageId =
- isMailboxReceiveNode(node) ? node.getStageId() : ((MailboxSendNode) node).getReceiverStageId();
+ isMailboxReceiveNode(node) ? node.getPlanFragmentId() : ((MailboxSendNode) node).getReceiverStageId();
int senderStageId =
- isMailboxReceiveNode(node) ? ((MailboxReceiveNode) node).getSenderStageId() : node.getStageId();
+ isMailboxReceiveNode(node) ? ((MailboxReceiveNode) node).getSenderStageId() : node.getPlanFragmentId();
DispatchablePlanMetadata receiverStagePlanMetadata =
context.getDispatchablePlanMetadataMap().get(receiverStageId);
DispatchablePlanMetadata senderStagePlanMetadata = context.getDispatchablePlanMetadataMap().get(senderStageId);
@@ -71,20 +71,21 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
return null;
}
- private static boolean isMailboxReceiveNode(StageNode node) {
+ private static boolean isMailboxReceiveNode(PlanNode node) {
return node instanceof MailboxReceiveNode;
}
- private MailboxMetadata getMailboxMetadata(DispatchablePlanMetadata stagePlanMetadata, int stageId, int workerId) {
+ private MailboxMetadata getMailboxMetadata(DispatchablePlanMetadata dispatchablePlanMetadata, int planFragmentId,
+ int workerId) {
Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailBoxIdsMap =
- stagePlanMetadata.getWorkerIdToMailBoxIdsMap();
+ dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap();
if (!workerIdToMailBoxIdsMap.containsKey(workerId)) {
workerIdToMailBoxIdsMap.put(workerId, new HashMap<>());
}
- Map<Integer, MailboxMetadata> stageToMailboxMetadataMap = workerIdToMailBoxIdsMap.get(workerId);
- if (!stageToMailboxMetadataMap.containsKey(stageId)) {
- stageToMailboxMetadataMap.put(stageId, new MailboxMetadata());
+ Map<Integer, MailboxMetadata> planFragmentToMailboxMetadataMap = workerIdToMailBoxIdsMap.get(workerId);
+ if (!planFragmentToMailboxMetadataMap.containsKey(planFragmentId)) {
+ planFragmentToMailboxMetadataMap.put(planFragmentId, new MailboxMetadata());
}
- return stageToMailboxMetadataMap.get(stageId);
+ return planFragmentToMailboxMetadataMap.get(planFragmentId);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
index 23a752a991..d8c6283335 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
@@ -24,21 +24,21 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
/**
* Context used for running the {@link GreedyShuffleRewriteVisitor}.
*/
class GreedyShuffleRewriteContext {
- private final Map<Integer, StageNode> _rootStageNode;
- private final Map<Integer, List<StageNode>> _leafNodes;
+ private final Map<Integer, PlanNode> _rootStageNode;
+ private final Map<Integer, List<PlanNode>> _leafNodes;
private final Set<Integer> _joinStages;
private final Set<Integer> _setOpStages;
/**
- * A map to track the partition keys for the input to the MailboxSendNode of a given stageId. This is needed
+ * A map to track the partition keys for the input to the MailboxSendNode of a given planFragmentId. This is needed
* because the {@link GreedyShuffleRewriteVisitor} doesn't determine the distribution of the sender if the receiver
* is a join-stage.
*/
@@ -53,75 +53,75 @@ class GreedyShuffleRewriteContext {
}
/**
- * Returns the root StageNode for a given stageId.
+ * Returns the root StageNode for a given planFragmentId.
*/
- StageNode getRootStageNode(Integer stageId) {
- return _rootStageNode.get(stageId);
+ PlanNode getRootStageNode(Integer planFragmentId) {
+ return _rootStageNode.get(planFragmentId);
}
/**
- * Sets the root StageNode for a given stageId.
+ * Sets the root StageNode for a given planFragmentId.
*/
- void setRootStageNode(Integer stageId, StageNode stageNode) {
- _rootStageNode.put(stageId, stageNode);
+ void setRootStageNode(Integer planFragmentId, PlanNode planNode) {
+ _rootStageNode.put(planFragmentId, planNode);
}
/**
- * Returns all the leaf StageNode for a given stageId.
+ * Returns all the leaf StageNode for a given planFragmentId.
*/
- List<StageNode> getLeafNodes(Integer stageId) {
- return _leafNodes.get(stageId);
+ List<PlanNode> getLeafNodes(Integer planFragmentId) {
+ return _leafNodes.get(planFragmentId);
}
/**
- * Adds a leaf StageNode for a given stageId.
+ * Adds a leaf PlanNode for a given planFragmentId.
*/
- void addLeafNode(Integer stageId, StageNode stageNode) {
- _leafNodes.computeIfAbsent(stageId, (x) -> new ArrayList<>()).add(stageNode);
+ void addLeafNode(Integer planFragmentId, PlanNode planNode) {
+ _leafNodes.computeIfAbsent(planFragmentId, (x) -> new ArrayList<>()).add(planNode);
}
/**
- * {@link GreedyShuffleRewriteContext} allows checking whether a given stageId has a JoinNode or not. During
- * pre-computation, this method may be used to mark that the given stageId has a JoinNode.
+ * {@link GreedyShuffleRewriteContext} allows checking whether a given planFragmentId has a JoinNode or not. During
+ * pre-computation, this method may be used to mark that the given planFragmentId has a JoinNode.
*/
- void markJoinStage(Integer stageId) {
- _joinStages.add(stageId);
+ void markJoinStage(Integer planFragmentId) {
+ _joinStages.add(planFragmentId);
}
/**
- * Returns true if the given stageId has a JoinNode.
+ * Returns true if the given planFragmentId has a JoinNode.
*/
- boolean isJoinStage(Integer stageId) {
- return _joinStages.contains(stageId);
+ boolean isJoinStage(Integer planFragmentId) {
+ return _joinStages.contains(planFragmentId);
}
-
/**
- * {@link GreedyShuffleRewriteContext} allows checking whether a given stageId has a SetOpNode or not. During
- * pre-computation, this method may be used to mark that the given stageId has a SetOpNode.
+ * {@link GreedyShuffleRewriteContext} allows checking whether a given planFragmentId has a SetOpNode or not. During
+ * pre-computation, this method may be used to mark that the given planFragmentId has a SetOpNode.
*/
- void markSetOpStage(Integer stageId) {
- _setOpStages.add(stageId);
+ void markSetOpStage(Integer planFragmentId) {
+ _setOpStages.add(planFragmentId);
}
/**
- * Returns true if the given stageId has a SetOpNode.
+ * Returns true if the given planFragmentId has a SetOpNode.
*/
- boolean isSetOpStage(Integer stageId) {
- return _setOpStages.contains(stageId);
+ boolean isSetOpStage(Integer planFragmentId) {
+ return _setOpStages.contains(planFragmentId);
}
/**
- * This returns the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given stageId.
+ * This returns the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given
+ * planFragmentId.
*/
- Set<ColocationKey> getColocationKeys(Integer stageId) {
- return _senderInputColocationKeys.get(stageId);
+ Set<ColocationKey> getColocationKeys(Integer planFragmentId) {
+ return _senderInputColocationKeys.get(planFragmentId);
}
/**
- * This sets the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given stageId.
+ * This sets the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given planFragmentId.
*/
- void setColocationKeys(Integer stageId, Set<ColocationKey> colocationKeys) {
- _senderInputColocationKeys.put(stageId, colocationKeys);
+ void setColocationKeys(Integer planFragmentId, Set<ColocationKey> colocationKeys) {
+ _senderInputColocationKeys.put(planFragmentId, colocationKeys);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
index d42fcfa4ce..f8d7e30842 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
@@ -18,12 +18,12 @@
*/
package org.apache.pinot.query.planner.physical.colocated;
-import org.apache.pinot.query.planner.stage.DefaultPostOrderTraversalVisitor;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
/**
@@ -32,44 +32,44 @@ import org.apache.pinot.query.planner.stage.TableScanNode;
class GreedyShuffleRewritePreComputeVisitor
extends DefaultPostOrderTraversalVisitor<Integer, GreedyShuffleRewriteContext> {
- static GreedyShuffleRewriteContext preComputeContext(StageNode rootStageNode) {
+ static GreedyShuffleRewriteContext preComputeContext(PlanNode rootPlanNode) {
GreedyShuffleRewriteContext context = new GreedyShuffleRewriteContext();
- rootStageNode.visit(new GreedyShuffleRewritePreComputeVisitor(), context);
+ rootPlanNode.visit(new GreedyShuffleRewritePreComputeVisitor(), context);
return context;
}
@Override
- public Integer process(StageNode stageNode, GreedyShuffleRewriteContext context) {
- int currentStageId = stageNode.getStageId();
- context.setRootStageNode(currentStageId, stageNode);
+ public Integer process(PlanNode planNode, GreedyShuffleRewriteContext context) {
+ int currentStageId = planNode.getPlanFragmentId();
+ context.setRootStageNode(currentStageId, planNode);
return 0;
}
@Override
public Integer visitJoin(JoinNode joinNode, GreedyShuffleRewriteContext context) {
super.visitJoin(joinNode, context);
- context.markJoinStage(joinNode.getStageId());
+ context.markJoinStage(joinNode.getPlanFragmentId());
return 0;
}
@Override
- public Integer visitMailboxReceive(MailboxReceiveNode stageNode, GreedyShuffleRewriteContext context) {
- super.visitMailboxReceive(stageNode, context);
- context.addLeafNode(stageNode.getStageId(), stageNode);
+ public Integer visitMailboxReceive(MailboxReceiveNode planNode, GreedyShuffleRewriteContext context) {
+ super.visitMailboxReceive(planNode, context);
+ context.addLeafNode(planNode.getPlanFragmentId(), planNode);
return 0;
}
@Override
- public Integer visitTableScan(TableScanNode stageNode, GreedyShuffleRewriteContext context) {
- super.visitTableScan(stageNode, context);
- context.addLeafNode(stageNode.getStageId(), stageNode);
+ public Integer visitTableScan(TableScanNode planNode, GreedyShuffleRewriteContext context) {
+ super.visitTableScan(planNode, context);
+ context.addLeafNode(planNode.getPlanFragmentId(), planNode);
return 0;
}
@Override
public Integer visitSetOp(SetOpNode setOpNode, GreedyShuffleRewriteContext context) {
super.visitSetOp(setOpNode, context);
- context.markSetOpStage(setOpNode.getStageId());
+ context.markSetOpStage(setOpNode.getPlanFragmentId());
return 0;
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 6d58bde3af..99e2f2ab68 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -34,20 +34,20 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
*
* Also see: {@link ColocationKey} for its definition.
*/
-public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
+public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(GreedyShuffleRewriteVisitor.class);
private final TableCache _tableCache;
@@ -75,14 +75,14 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
private boolean _canSkipShuffleForJoin;
public static void optimizeShuffles(QueryPlan queryPlan, TableCache tableCache) {
- StageNode rootStageNode = queryPlan.getQueryStageMap().get(0);
+ PlanNode rootPlanNode = queryPlan.getQueryStageMap().get(0);
Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap = queryPlan.getDispatchablePlanMetadataMap();
- GreedyShuffleRewriteContext context = GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootStageNode);
- // This assumes that if stageId(S1) > stageId(S2), then S1 is not an ancestor of S2.
+ GreedyShuffleRewriteContext context = GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootPlanNode);
+ // This assumes that if planFragmentId(S1) > planFragmentId(S2), then S1 is not an ancestor of S2.
// TODO: If this assumption is wrong, we can compute the reverse topological ordering explicitly.
- for (int stageId = dispatchablePlanMetadataMap.size() - 1; stageId >= 0; stageId--) {
- StageNode stageNode = context.getRootStageNode(stageId);
- stageNode.visit(new GreedyShuffleRewriteVisitor(tableCache, dispatchablePlanMetadataMap), context);
+ for (int planFragmentId = dispatchablePlanMetadataMap.size() - 1; planFragmentId >= 0; planFragmentId--) {
+ PlanNode planNode = context.getRootStageNode(planFragmentId);
+ planNode.visit(new GreedyShuffleRewriteVisitor(tableCache, dispatchablePlanMetadataMap), context);
}
}
@@ -118,7 +118,8 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
@Override
public Set<ColocationKey> visitJoin(JoinNode node, GreedyShuffleRewriteContext context) {
List<MailboxReceiveNode> innerLeafNodes =
- context.getLeafNodes(node.getStageId()).stream().map(x -> (MailboxReceiveNode) x).collect(Collectors.toList());
+ context.getLeafNodes(node.getPlanFragmentId()).stream().map(x -> (MailboxReceiveNode) x)
+ .collect(Collectors.toList());
Preconditions.checkState(innerLeafNodes.size() == 2);
// Multiple checks need to be made to ensure that shuffle can be skipped for a join.
@@ -127,7 +128,8 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
// Step-2: Only if the servers assigned to both left and right nodes are equal and the servers assigned to the join
// stage are a superset of those servers, can we skip shuffles.
canColocate =
- canColocate && canServerAssignmentAllowShuffleSkip(node.getStageId(), innerLeafNodes.get(0).getSenderStageId(),
+ canColocate && canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(),
+ innerLeafNodes.get(0).getSenderStageId(),
innerLeafNodes.get(1).getSenderStageId());
// Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs, check whether the key partitioning can
// allow shuffle skip.
@@ -140,7 +142,7 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0), innerLeafNodes.get(1), context);
if (canColocate) {
// If shuffle can be skipped, reassign servers.
- _dispatchablePlanMetadataMap.get(node.getStageId()).setServerInstanceToWorkerIdMap(
+ _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
_dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getServerInstanceToWorkerIdMap());
_canSkipShuffleForJoin = true;
}
@@ -168,18 +170,19 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
Set<ColocationKey> oldColocationKeys = context.getColocationKeys(node.getSenderStageId());
// If the current stage is not a join-stage, then we already know sender's distribution
- if (!context.isJoinStage(node.getStageId())) {
+ if (!context.isJoinStage(node.getPlanFragmentId())) {
if (selector == null) {
return new HashSet<>();
- } else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getStageId(),
+ } else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getPlanFragmentId(),
node.getSenderStageId())) {
node.setExchangeType(RelDistribution.Type.SINGLETON);
- _dispatchablePlanMetadataMap.get(node.getStageId()).setServerInstanceToWorkerIdMap(
+ _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getServerInstanceToWorkerIdMap());
return oldColocationKeys;
}
// This means we can't skip shuffle and there's a partitioning enforced by receiver.
- int numPartitions = _dispatchablePlanMetadataMap.get(node.getStageId()).getServerInstanceToWorkerIdMap().size();
+ int numPartitions =
+ _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
.map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
return new HashSet<>(colocationKeys);
@@ -195,7 +198,8 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
return new HashSet<>();
}
// This means we can't skip shuffle and there's a partitioning enforced by receiver.
- int numPartitions = _dispatchablePlanMetadataMap.get(node.getStageId()).getServerInstanceToWorkerIdMap().size();
+ int numPartitions =
+ _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
.map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
return new HashSet<>(colocationKeys);
@@ -210,19 +214,19 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
// If receiver is not a join-stage, then we can determine distribution type now.
if (!context.isJoinStage(node.getReceiverStageId())) {
Set<ColocationKey> colocationKeys;
- if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getStageId())) {
+ if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getPlanFragmentId())) {
// Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side.
node.setExchangeType(RelDistribution.Type.SINGLETON);
colocationKeys = oldColocationKeys;
} else {
colocationKeys = new HashSet<>();
}
- context.setColocationKeys(node.getStageId(), colocationKeys);
+ context.setColocationKeys(node.getPlanFragmentId(), colocationKeys);
return colocationKeys;
}
// If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode.
Set<ColocationKey> mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>();
- context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys);
+ context.setColocationKeys(node.getPlanFragmentId(), mailboxSendColocationKeys);
return mailboxSendColocationKeys;
}
@@ -371,7 +375,7 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
private static boolean partitionKeyConditionForJoin(MailboxReceiveNode mailboxReceiveNode,
MailboxSendNode mailboxSendNode, GreedyShuffleRewriteContext context) {
// First check ColocationKeyCondition for the sender <--> sender.getInputs().get(0) pair
- Set<ColocationKey> oldColocationKeys = context.getColocationKeys(mailboxSendNode.getStageId());
+ Set<ColocationKey> oldColocationKeys = context.getColocationKeys(mailboxSendNode.getPlanFragmentId());
KeySelector<Object[], Object[]> selector = mailboxSendNode.getPartitionKeySelector();
if (!colocationKeyCondition(oldColocationKeys, selector)) {
return false;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
similarity index 72%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
index c2f0e4b1be..ae696ac7f9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.util.ArrayList;
import java.util.List;
@@ -26,40 +26,40 @@ import org.apache.pinot.query.planner.serde.ProtoSerializable;
import org.apache.pinot.query.planner.serde.ProtoSerializationUtils;
-public abstract class AbstractStageNode implements StageNode, ProtoSerializable {
+public abstract class AbstractPlanNode implements PlanNode, ProtoSerializable {
- protected int _stageId;
- protected final List<StageNode> _inputs;
+ protected int _planFragmentId;
+ protected final List<PlanNode> _inputs;
protected DataSchema _dataSchema;
- public AbstractStageNode(int stageId) {
- this(stageId, null);
+ public AbstractPlanNode(int planFragmentId) {
+ this(planFragmentId, null);
}
- public AbstractStageNode(int stageId, DataSchema dataSchema) {
- _stageId = stageId;
+ public AbstractPlanNode(int planFragmentId, DataSchema dataSchema) {
+ _planFragmentId = planFragmentId;
_dataSchema = dataSchema;
_inputs = new ArrayList<>();
}
@Override
- public int getStageId() {
- return _stageId;
+ public int getPlanFragmentId() {
+ return _planFragmentId;
}
@Override
- public void setStageId(int stageId) {
- _stageId = stageId;
+ public void setPlanFragmentId(int planFragmentId) {
+ _planFragmentId = planFragmentId;
}
@Override
- public List<StageNode> getInputs() {
+ public List<PlanNode> getInputs() {
return _inputs;
}
@Override
- public void addInput(StageNode stageNode) {
- _inputs.add(stageNode);
+ public void addInput(PlanNode planNode) {
+ _inputs.add(planNode);
}
@Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AggregateNode.java
similarity index 86%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AggregateNode.java
index ed21c674d6..6776d9e769 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AggregateNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import com.google.common.base.Preconditions;
import java.util.List;
@@ -29,7 +29,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class AggregateNode extends AbstractStageNode {
+public class AggregateNode extends AbstractPlanNode {
public static final RelHint FINAL_STAGE_HINT = RelHint.builder(
PinotHintStrategyTable.INTERNAL_AGG_FINAL_STAGE).build();
public static final RelHint INTERMEDIATE_STAGE_HINT = RelHint.builder(
@@ -41,13 +41,14 @@ public class AggregateNode extends AbstractStageNode {
@ProtoProperties
private List<RexExpression> _groupSet;
- public AggregateNode(int stageId) {
- super(stageId);
+ public AggregateNode(int planFragmentId) {
+ super(planFragmentId);
}
- public AggregateNode(int stageId, DataSchema dataSchema, List<AggregateCall> aggCalls, List<RexExpression> groupSet,
+ public AggregateNode(int planFragmentId, DataSchema dataSchema, List<AggregateCall> aggCalls,
+ List<RexExpression> groupSet,
List<RelHint> relHints) {
- super(stageId, dataSchema);
+ super(planFragmentId, dataSchema);
_aggCalls = aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
_groupSet = groupSet;
_relHints = relHints;
@@ -81,7 +82,7 @@ public class AggregateNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitAggregate(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java
similarity index 95%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java
index 4c93f9d289..e1ca593b02 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java
@@ -16,15 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
/**
* A base implementation of a visitor pattern where the children of a given node are visited first and after that the
* node is processed (post-order traversal).
*/
-public abstract class DefaultPostOrderTraversalVisitor<T, C> implements StageNodeVisitor<T, C> {
+public abstract class DefaultPostOrderTraversalVisitor<T, C> implements PlanNodeVisitor<T, C> {
- public abstract T process(StageNode stageNode, C context);
+ public abstract T process(PlanNode planNode, C context);
@Override
public T visitAggregate(AggregateNode node, C context) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
similarity index 90%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 328dd8568c..04adca3eee 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.util.List;
import org.apache.calcite.rel.RelDistribution;
@@ -29,7 +29,7 @@ import org.apache.pinot.query.planner.serde.ProtoProperties;
* ExchangeNode represents the exchange stage in the query plan.
* It is used to exchange the data between the instances.
*/
-public class ExchangeNode extends AbstractStageNode {
+public class ExchangeNode extends AbstractPlanNode {
@ProtoProperties
private RelDistribution.Type _exchangeType;
@@ -46,8 +46,8 @@ public class ExchangeNode extends AbstractStageNode {
@ProtoProperties
private List<RelFieldCollation> _collations;
- public ExchangeNode(int stageId) {
- super(stageId);
+ public ExchangeNode(int planFragmentId) {
+ super(planFragmentId);
}
public ExchangeNode(int currentStageId, DataSchema dataSchema, RelDistribution distribution,
@@ -67,7 +67,7 @@ public class ExchangeNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitExchange(this, context);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/FilterNode.java
similarity index 86%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/FilterNode.java
index 52ed004da1..15d4ff15c8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/FilterNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import org.apache.calcite.rex.RexNode;
import org.apache.pinot.common.utils.DataSchema;
@@ -24,12 +24,12 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class FilterNode extends AbstractStageNode {
+public class FilterNode extends AbstractPlanNode {
@ProtoProperties
private RexExpression _condition;
- public FilterNode(int stageId) {
- super(stageId);
+ public FilterNode(int planFragmentId) {
+ super(planFragmentId);
}
public FilterNode(int currentStageId, DataSchema dataSchema, RexNode condition) {
@@ -47,7 +47,7 @@ public class FilterNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitFilter(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
similarity index 88%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index 3b127d6568..6d089c6239 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.util.Arrays;
import java.util.List;
@@ -28,7 +28,7 @@ import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class JoinNode extends AbstractStageNode {
+public class JoinNode extends AbstractPlanNode {
@ProtoProperties
private JoinRelType _joinRelType;
@ProtoProperties
@@ -40,13 +40,13 @@ public class JoinNode extends AbstractStageNode {
@ProtoProperties
private List<String> _rightColumnNames;
- public JoinNode(int stageId) {
- super(stageId);
+ public JoinNode(int planFragmentId) {
+ super(planFragmentId);
}
- public JoinNode(int stageId, DataSchema dataSchema, DataSchema leftSchema, DataSchema rightSchema,
+ public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema leftSchema, DataSchema rightSchema,
JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause) {
- super(stageId, dataSchema);
+ super(planFragmentId, dataSchema);
_leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
_rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
_joinRelType = joinRelType;
@@ -80,7 +80,7 @@ public class JoinNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitJoin(this, context);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
similarity index 89%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
index 13eb8b5296..3597651fdd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
@@ -32,7 +32,7 @@ import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class MailboxReceiveNode extends AbstractStageNode {
+public class MailboxReceiveNode extends AbstractPlanNode {
@ProtoProperties
private int _senderStageId;
@ProtoProperties
@@ -50,17 +50,17 @@ public class MailboxReceiveNode extends AbstractStageNode {
// this is only available during planning and should not be relied
// on in any post-serialization code
- private transient StageNode _sender;
+ private transient PlanNode _sender;
- public MailboxReceiveNode(int stageId) {
- super(stageId);
+ public MailboxReceiveNode(int planFragmentId) {
+ super(planFragmentId);
}
- public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
+ public MailboxReceiveNode(int planFragmentId, DataSchema dataSchema, int senderStageId,
RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
@Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
- StageNode sender) {
- super(stageId, dataSchema);
+ PlanNode sender) {
+ super(planFragmentId, dataSchema);
_senderStageId = senderStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
@@ -117,7 +117,7 @@ public class MailboxReceiveNode extends AbstractStageNode {
return _isSortOnReceiver;
}
- public StageNode getSender() {
+ public PlanNode getSender() {
return _sender;
}
@@ -127,7 +127,7 @@ public class MailboxReceiveNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitMailboxReceive(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
similarity index 90%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index 54f80c6794..08624959b7 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
@@ -32,7 +32,7 @@ import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class MailboxSendNode extends AbstractStageNode {
+public class MailboxSendNode extends AbstractPlanNode {
@ProtoProperties
private int _receiverStageId;
@ProtoProperties
@@ -46,14 +46,14 @@ public class MailboxSendNode extends AbstractStageNode {
@ProtoProperties
private boolean _isSortOnSender;
- public MailboxSendNode(int stageId) {
- super(stageId);
+ public MailboxSendNode(int planFragmentId) {
+ super(planFragmentId);
}
- public MailboxSendNode(int stageId, DataSchema dataSchema, int receiverStageId,
+ public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId,
RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
@Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender) {
- super(stageId, dataSchema);
+ super(planFragmentId, dataSchema);
_receiverStageId = receiverStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
@@ -111,7 +111,7 @@ public class MailboxSendNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitMailboxSend(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNode.java
similarity index 71%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNode.java
index ae851f449b..1e6aa32d0e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.io.Serializable;
import java.util.List;
@@ -24,22 +24,22 @@ import org.apache.pinot.common.utils.DataSchema;
/**
- * Stage Node is a serializable version of the {@link org.apache.calcite.rel.RelNode}.
+ * PlanNode is a serializable version of the {@link org.apache.calcite.rel.RelNode}.
*
- * TODO: stage node currently uses java.io.Serializable as its serialization format.
+ * TODO: PlanNode currently uses java.io.Serializable as its serialization format.
* We should experiment with other type of serialization format for better performance.
* Essentially what we need is a way to exclude the planner context from the RelNode but only keeps the
- * constructed relational content because we will no longer revisit the planner after stage is created.
+ * constructed relational content because we will no longer revisit the planner after PlanFragment is created.
*/
-public interface StageNode extends Serializable {
+public interface PlanNode extends Serializable {
- int getStageId();
+ int getPlanFragmentId();
- void setStageId(int stageId);
+ void setPlanFragmentId(int planFragmentId);
- List<StageNode> getInputs();
+ List<PlanNode> getInputs();
- void addInput(StageNode stageNode);
+ void addInput(PlanNode planNode);
DataSchema getDataSchema();
@@ -47,5 +47,5 @@ public interface StageNode extends Serializable {
String explain();
- <T, C> T visit(StageNodeVisitor<T, C> visitor, C context);
+ <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
similarity index 79%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
index f72e9540ec..d917cab021 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
@@ -16,26 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
+import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
import org.apache.pinot.query.planner.QueryPlan;
/**
- * {@code StageNodeVisitor} is a skeleton class that allows for implementations of {@code StageNode}
- * tree traversals using the {@link StageNode#visit(StageNodeVisitor, Object)} method. There is no
+ * {@code PlanNodeVisitor} is a skeleton class that allows for implementations of {@code PlanNode}
+ * tree traversals using the {@link PlanNode#visit(PlanNodeVisitor, Object)} method. There is no
* enforced traversal order, and should be implemented by subclasses.
*
* <p>It is recommended that implementors use private constructors and static methods to access main
- * functionality (see {@link org.apache.pinot.query.planner.ExplainPlanStageVisitor#explain(QueryPlan)}
+ * functionality (see {@link ExplainPlanPlanVisitor#explain(QueryPlan)}
* as an example of a usage of this pattern.
*
- * @param <T> the return type for all visits
- * @param <C> a Context that will be passed as the second parameter to {@code StageNode#visit},
+ * @param <T> the return type for all visitsPlanNodeVisitor
+ * @param <C> a Context that will be passed as the second parameter to {@code PlanNode#visit},
* implementors can decide how they want to use this context (e.g. whether or not
* it can be modified in place or whether it's an immutable context)
*/
-public interface StageNodeVisitor<T, C> {
+public interface PlanNodeVisitor<T, C> {
T visitAggregate(AggregateNode node, C context);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ProjectNode.java
similarity index 86%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ProjectNode.java
index 8371dda609..9333e89893 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ProjectNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.util.List;
import java.util.stream.Collectors;
@@ -26,12 +26,12 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class ProjectNode extends AbstractStageNode {
+public class ProjectNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _projects;
- public ProjectNode(int stageId) {
- super(stageId);
+ public ProjectNode(int planFragmentId) {
+ super(planFragmentId);
}
public ProjectNode(int currentStageId, DataSchema dataSchema, List<RexNode> projects) {
super(currentStageId, dataSchema);
@@ -48,7 +48,7 @@ public class ProjectNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitProject(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SetOpNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SetOpNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
index aa7003449f..1eefcb93de 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SetOpNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.logical.LogicalIntersect;
@@ -29,7 +29,7 @@ import org.apache.pinot.query.planner.serde.ProtoProperties;
/**
* Set operation node is used to represent UNION, INTERSECT, EXCEPT.
*/
-public class SetOpNode extends AbstractStageNode {
+public class SetOpNode extends AbstractPlanNode {
@ProtoProperties
private SetOpType _setOpType;
@@ -37,12 +37,12 @@ public class SetOpNode extends AbstractStageNode {
@ProtoProperties
private boolean _all;
- public SetOpNode(int stageId) {
- super(stageId);
+ public SetOpNode(int planFragmentId) {
+ super(planFragmentId);
}
- public SetOpNode(SetOpType setOpType, int stageId, DataSchema dataSchema, boolean all) {
- super(stageId, dataSchema);
+ public SetOpNode(SetOpType setOpType, int planFragmentId, DataSchema dataSchema, boolean all) {
+ super(planFragmentId, dataSchema);
_setOpType = setOpType;
_all = all;
}
@@ -61,7 +61,7 @@ public class SetOpNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitSetOp(this, context);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
index 4ffe901b20..6bc91430f3 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.util.ArrayList;
import java.util.List;
@@ -26,7 +26,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class SortNode extends AbstractStageNode {
+public class SortNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _collationKeys;
@ProtoProperties
@@ -36,12 +36,13 @@ public class SortNode extends AbstractStageNode {
@ProtoProperties
private int _offset;
- public SortNode(int stageId) {
- super(stageId);
+ public SortNode(int planFragmentId) {
+ super(planFragmentId);
}
- public SortNode(int stageId, List<RelFieldCollation> fieldCollations, int fetch, int offset, DataSchema dataSchema) {
- super(stageId, dataSchema);
+ public SortNode(int planFragmentId, List<RelFieldCollation> fieldCollations, int fetch, int offset,
+ DataSchema dataSchema) {
+ super(planFragmentId, dataSchema);
_collationDirections = new ArrayList<>(fieldCollations.size());
_collationKeys = new ArrayList<>(fieldCollations.size());
_fetch = fetch;
@@ -76,7 +77,7 @@ public class SortNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitSort(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/StageNodeSerDeUtils.java
similarity index 63%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/StageNodeSerDeUtils.java
index f96eadfc06..e803d4beee 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/StageNodeSerDeUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.utils.DataSchema;
@@ -27,28 +27,28 @@ public final class StageNodeSerDeUtils {
// do not instantiate.
}
- public static AbstractStageNode deserializeStageNode(Plan.StageNode protoNode) {
- AbstractStageNode stageNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId());
- stageNode.setDataSchema(extractDataSchema(protoNode));
- stageNode.fromObjectField(protoNode.getObjectField());
+ public static AbstractPlanNode deserializeStageNode(Plan.StageNode protoNode) {
+ AbstractPlanNode planNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId());
+ planNode.setDataSchema(extractDataSchema(protoNode));
+ planNode.fromObjectField(protoNode.getObjectField());
for (Plan.StageNode protoChild : protoNode.getInputsList()) {
- stageNode.addInput(deserializeStageNode(protoChild));
+ planNode.addInput(deserializeStageNode(protoChild));
}
- return stageNode;
+ return planNode;
}
- public static Plan.StageNode serializeStageNode(AbstractStageNode stageNode) {
+ public static Plan.StageNode serializeStageNode(AbstractPlanNode planNode) {
Plan.StageNode.Builder builder = Plan.StageNode.newBuilder()
- .setStageId(stageNode.getStageId())
- .setNodeName(stageNode.getClass().getSimpleName())
- .setObjectField(stageNode.toObjectField());
- DataSchema dataSchema = stageNode.getDataSchema();
+ .setStageId(planNode.getPlanFragmentId())
+ .setNodeName(planNode.getClass().getSimpleName())
+ .setObjectField(planNode.toObjectField());
+ DataSchema dataSchema = planNode.getDataSchema();
for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
builder.addColumnNames(dataSchema.getColumnName(i));
builder.addColumnDataTypes(dataSchema.getColumnDataType(i).name());
}
- for (StageNode childNode : stageNode.getInputs()) {
- builder.addInputs(serializeStageNode((AbstractStageNode) childNode));
+ for (PlanNode childNode : planNode.getInputs()) {
+ builder.addInputs(serializeStageNode((AbstractPlanNode) childNode));
}
return builder.build();
}
@@ -63,30 +63,30 @@ public final class StageNodeSerDeUtils {
return new DataSchema(columnNames, columnDataTypes);
}
- private static AbstractStageNode newNodeInstance(String nodeName, int stageId) {
+ private static AbstractPlanNode newNodeInstance(String nodeName, int planFragmentId) {
switch (nodeName) {
case "TableScanNode":
- return new TableScanNode(stageId);
+ return new TableScanNode(planFragmentId);
case "JoinNode":
- return new JoinNode(stageId);
+ return new JoinNode(planFragmentId);
case "ProjectNode":
- return new ProjectNode(stageId);
+ return new ProjectNode(planFragmentId);
case "FilterNode":
- return new FilterNode(stageId);
+ return new FilterNode(planFragmentId);
case "AggregateNode":
- return new AggregateNode(stageId);
+ return new AggregateNode(planFragmentId);
case "SortNode":
- return new SortNode(stageId);
+ return new SortNode(planFragmentId);
case "MailboxSendNode":
- return new MailboxSendNode(stageId);
+ return new MailboxSendNode(planFragmentId);
case "MailboxReceiveNode":
- return new MailboxReceiveNode(stageId);
+ return new MailboxReceiveNode(planFragmentId);
case "ValueNode":
- return new ValueNode(stageId);
+ return new ValueNode(planFragmentId);
case "WindowNode":
- return new WindowNode(stageId);
+ return new WindowNode(planFragmentId);
case "SetOpNode":
- return new SetOpNode(stageId);
+ return new SetOpNode(planFragmentId);
case "ExchangeNode":
throw new IllegalArgumentException(
"ExchangeNode should be already split into MailboxSendNode and MailboxReceiveNode");
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/TableScanNode.java
similarity index 78%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/TableScanNode.java
index 7711dd6235..a9c92ca387 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/TableScanNode.java
@@ -16,25 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class TableScanNode extends AbstractStageNode {
+public class TableScanNode extends AbstractPlanNode {
@ProtoProperties
private String _tableName;
@ProtoProperties
private List<String> _tableScanColumns;
- public TableScanNode(int stageId) {
- super(stageId);
+ public TableScanNode(int planFragmentId) {
+ super(planFragmentId);
}
- public TableScanNode(int stageId, DataSchema dataSchema, String tableName, List<String> tableScanColumns) {
- super(stageId, dataSchema);
+ public TableScanNode(int planFragmentId, DataSchema dataSchema, String tableName, List<String> tableScanColumns) {
+ super(planFragmentId, dataSchema);
_tableName = tableName;
_tableScanColumns = tableScanColumns;
}
@@ -53,7 +53,7 @@ public class TableScanNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitTableScan(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
similarity index 88%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
index b3ad0d40f6..248af5d8a2 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
@@ -27,12 +27,12 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class ValueNode extends AbstractStageNode {
+public class ValueNode extends AbstractPlanNode {
@ProtoProperties
private List<List<RexExpression>> _literalRows;
- public ValueNode(int stageId) {
- super(stageId);
+ public ValueNode(int planFragmentId) {
+ super(planFragmentId);
}
public ValueNode(int currentStageId, DataSchema dataSchema,
@@ -58,7 +58,7 @@ public class ValueNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitValue(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
similarity index 92%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
index 8e122af119..810859ae22 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import com.clearspring.analytics.util.Preconditions;
import java.util.ArrayList;
@@ -31,7 +31,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
-public class WindowNode extends AbstractStageNode {
+public class WindowNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _groupSet;
@ProtoProperties
@@ -61,12 +61,13 @@ public class WindowNode extends AbstractStageNode {
RANGE
}
- public WindowNode(int stageId) {
- super(stageId);
+ public WindowNode(int planFragmentId) {
+ super(planFragmentId);
}
- public WindowNode(int stageId, List<Window.Group> windowGroups, List<RexLiteral> constants, DataSchema dataSchema) {
- super(stageId, dataSchema);
+ public WindowNode(int planFragmentId, List<Window.Group> windowGroups, List<RexLiteral> constants,
+ DataSchema dataSchema) {
+ super(planFragmentId, dataSchema);
// Only a single Window Group should exist per WindowNode.
Preconditions.checkState(windowGroups.size() == 1,
String.format("Only a single window group is allowed! Number of window groups: %d", windowGroups.size()));
@@ -110,7 +111,7 @@ public class WindowNode extends AbstractStageNode {
}
@Override
- public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
return visitor.visitWindow(this, context);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java
index 5a10b91941..b3057db532 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java
@@ -28,11 +28,12 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import org.apache.pinot.query.planner.plannode.PlanNode;
/**
* Annotation {@code ProtoProperties} indicates whether a field defined in a
- * {@link org.apache.pinot.query.planner.stage.StageNode} should be serialized.
+ * {@link PlanNode} should be serialized.
*/
@Target({ElementType.ANNOTATION_TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/PlanFragmentMetadata.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/PlanFragmentMetadata.java
index 4fc6c2ef50..f1da239417 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/PlanFragmentMetadata.java
@@ -25,13 +25,13 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
/**
- * {@code StageMetadata} is used to send stage-level info about how to execute a stage physically.
+ * {@code PlanFragmentMetadata} is used to send plan fragment-level info about how to execute a stage physically.
*/
-public class StageMetadata {
+public class PlanFragmentMetadata {
private final List<WorkerMetadata> _workerMetadataList;
private final Map<String, String> _customProperties;
- public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+ public PlanFragmentMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
_workerMetadataList = workerMetadataList;
_customProperties = customProperties;
}
@@ -71,8 +71,8 @@ public class StageMetadata {
return this;
}
- public StageMetadata build() {
- return new StageMetadata(_workerMetadataList, _customProperties);
+ public PlanFragmentMetadata build() {
+ return new PlanFragmentMetadata(_workerMetadataList, _customProperties);
}
public void putAllCustomProperties(Map<String, String> customPropertyMap) {
@@ -80,11 +80,11 @@ public class StageMetadata {
}
}
- public static String getTableName(StageMetadata metadata) {
+ public static String getTableName(PlanFragmentMetadata metadata) {
return metadata.getCustomProperties().get(Builder.TABLE_NAME_KEY);
}
- public static TimeBoundaryInfo getTimeBoundary(StageMetadata metadata) {
+ public static TimeBoundaryInfo getTimeBoundary(PlanFragmentMetadata metadata) {
String timeColumn = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_COLUMN_KEY);
String timeValue = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_VALUE_KEY);
return timeColumn != null && timeValue != null ? new TimeBoundaryInfo(timeColumn, timeValue) : null;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index c1193a9c5b..7ebc6e96d8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -62,9 +62,9 @@ public class WorkerManager {
_routingManager = routingManager;
}
- public void assignWorkerToStage(int stageId, DispatchablePlanMetadata dispatchablePlanMetadata, long requestId,
+ public void assignWorkerToStage(int planFragmentId, DispatchablePlanMetadata dispatchablePlanMetadata, long requestId,
Map<String, String> options, Set<String> tableNames) {
- if (PlannerUtils.isRootStage(stageId)) {
+ if (PlannerUtils.isRootPlanFragment(planFragmentId)) {
// --- ROOT STAGE / BROKER REDUCE STAGE ---
// ROOT stage doesn't have a QueryServer as it is strictly only reducing results.
// here we simply assign the worker instance with identical server/mailbox port number.
@@ -177,7 +177,7 @@ public class WorkerManager {
}
/**
- * Acquire routing table for items listed in {@link org.apache.pinot.query.planner.stage.TableScanNode}.
+ * Acquire routing table for items listed in {@link org.apache.pinot.query.planner.plannode.TableScanNode}.
*
* @param logicalTableName it can either be a hybrid table name or a physical table name with table type.
* @return keyed-map from table type(s) to routing table(s).
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index 5295782229..9d92bfb697 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -85,8 +85,8 @@ public class WorkerMetadata {
return this;
}
- public Builder addMailBoxInfoMap(Integer stageId, MailboxMetadata mailBoxMetadata) {
- _mailBoxInfosMap.put(stageId, mailBoxMetadata);
+ public Builder addMailBoxInfoMap(Integer planFragmentId, MailboxMetadata mailBoxMetadata) {
+ _mailBoxInfosMap.put(planFragmentId, mailBoxMetadata);
return this;
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 6d6c4937a0..41a88ef1b6 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -28,17 +28,17 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.query.planner.ExplainPlanStageVisitor;
+import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.AbstractStageNode;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -78,10 +78,11 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
}
}
- private static void assertGroupBySingletonAfterJoin(QueryPlan queryPlan, boolean shouldRewrite) throws Exception {
+ private static void assertGroupBySingletonAfterJoin(QueryPlan queryPlan, boolean shouldRewrite)
+ throws Exception {
for (Map.Entry<Integer, DispatchablePlanMetadata> e : queryPlan.getDispatchablePlanMetadataMap().entrySet()) {
- if (e.getValue().getScannedTables().size() == 0 && !PlannerUtils.isRootStage(e.getKey())) {
- StageNode node = queryPlan.getQueryStageMap().get(e.getKey());
+ if (e.getValue().getScannedTables().size() == 0 && !PlannerUtils.isRootPlanFragment(e.getKey())) {
+ PlanNode node = queryPlan.getQueryStageMap().get(e.getKey());
while (node != null) {
if (node instanceof JoinNode) {
// JOIN is exchanged with hash distribution (data shuffle)
@@ -120,22 +121,22 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
// table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1
Assert.assertEquals(
e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
tables.get(0).equals("a") ? ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]")
: ImmutableList.of("localhost@{1,1}|[0]"));
- } else if (!PlannerUtils.isRootStage(e.getKey())) {
+ } else if (!PlannerUtils.isRootPlanFragment(e.getKey())) {
// join stage should have both servers used.
Assert.assertEquals(
e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
ImmutableSet.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
} else {
// reduce stage should have the reducer instance.
Assert.assertEquals(
e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
ImmutableSet.of("localhost@{3,3}|[0]"));
}
@@ -147,12 +148,12 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
String query = "SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 "
+ "WHERE a.col3 >= 0 AND a.col2 IN ('b') AND b.col3 < 0";
QueryPlan queryPlan = _queryEnvironment.planQuery(query);
- List<StageNode> intermediateStageRoots =
+ List<PlanNode> intermediateStageRoots =
queryPlan.getDispatchablePlanMetadataMap().entrySet().stream()
.filter(e -> e.getValue().getScannedTables().size() == 0)
.map(e -> queryPlan.getQueryStageMap().get(e.getKey())).collect(Collectors.toList());
// Assert that no project of filter node for any intermediate stage because all should've been pushed down.
- for (StageNode roots : intermediateStageRoots) {
+ for (PlanNode roots : intermediateStageRoots) {
assertNodeTypeNotIn(roots, ImmutableList.of(ProjectNode.class, FilterNode.class));
}
}
@@ -162,21 +163,24 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
String query = "SELECT * FROM d_OFFLINE";
QueryPlan queryPlan = _queryEnvironment.planQuery(query);
List<DispatchablePlanMetadata> tableScanMetadataList = queryPlan.getDispatchablePlanMetadataMap().values().stream()
- .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 0).collect(Collectors.toList());
+ .filter(planFragmentMetadata -> planFragmentMetadata.getScannedTables().size() != 0)
+ .collect(Collectors.toList());
Assert.assertEquals(tableScanMetadataList.size(), 1);
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
query = "SELECT * FROM d_REALTIME";
queryPlan = _queryEnvironment.planQuery(query);
tableScanMetadataList = queryPlan.getDispatchablePlanMetadataMap().values().stream()
- .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 0).collect(Collectors.toList());
+ .filter(planFragmentMetadata -> planFragmentMetadata.getScannedTables().size() != 0)
+ .collect(Collectors.toList());
Assert.assertEquals(tableScanMetadataList.size(), 1);
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1);
query = "SELECT * FROM d";
queryPlan = _queryEnvironment.planQuery(query);
tableScanMetadataList = queryPlan.getDispatchablePlanMetadataMap().values().stream()
- .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 0).collect(Collectors.toList());
+ .filter(planFragmentMetadata -> planFragmentMetadata.getScannedTables().size() != 0)
+ .collect(Collectors.toList());
Assert.assertEquals(tableScanMetadataList.size(), 1);
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
}
@@ -245,13 +249,13 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
if (tables.size() != 0) {
// table scan stages; for tableB it should have only 1
Assert.assertEquals(e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
ImmutableList.of("localhost@{1,1}|[0]"));
- } else if (!PlannerUtils.isRootStage(e.getKey())) {
+ } else if (!PlannerUtils.isRootPlanFragment(e.getKey())) {
// join stage should have both servers used.
Assert.assertEquals(e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
- .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
}
@@ -262,16 +266,16 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
// Test Utils.
// --------------------------------------------------------------------------
- private static void assertNodeTypeNotIn(StageNode node, List<Class<? extends AbstractStageNode>> bannedNodeType) {
+ private static void assertNodeTypeNotIn(PlanNode node, List<Class<? extends AbstractPlanNode>> bannedNodeType) {
Assert.assertFalse(isOneOf(bannedNodeType, node));
- for (StageNode child : node.getInputs()) {
+ for (PlanNode child : node.getInputs()) {
assertNodeTypeNotIn(child, bannedNodeType);
}
}
- private static boolean isOneOf(List<Class<? extends AbstractStageNode>> allowedNodeTypes,
- StageNode node) {
- for (Class<? extends AbstractStageNode> allowedNodeType : allowedNodeTypes) {
+ private static boolean isOneOf(List<Class<? extends AbstractPlanNode>> allowedNodeTypes,
+ PlanNode node) {
+ for (Class<? extends AbstractPlanNode> allowedNodeType : allowedNodeTypes) {
if (node.getClass() == allowedNodeType) {
return true;
}
@@ -281,7 +285,7 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
@DataProvider(name = "testQueryExceptionDataProvider")
private Object[][] provideQueriesWithException() {
- return new Object[][] {
+ return new Object[][]{
// wrong table is being used after JOIN
new Object[]{"SELECT b.col1 - a.col3 FROM a JOIN c ON a.col1 = c.col3", "Table 'b' not found"},
// non-agg column not being grouped
@@ -291,8 +295,10 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
// AT TIME ZONE should fail
new Object[]{"SELECT a.col1 AT TIME ZONE 'PST' FROM a", "No match found for function signature AT_TIME_ZONE"},
// CASE WHEN with non-consolidated result type at compile time.
- new Object[]{"SELECT SUM(CASE WHEN col3 > 10 THEN 1 WHEN col3 > 20 THEN 2 WHEN col3 > 30 THEN 3 "
- + "WHEN col3 > 40 THEN 4 WHEN col3 > 50 THEN '5' ELSE 0 END) FROM a", "while converting CASE WHEN"},
+ new Object[]{
+ "SELECT SUM(CASE WHEN col3 > 10 THEN 1 WHEN col3 > 20 THEN 2 WHEN col3 > 30 THEN 3 "
+ + "WHEN col3 > 40 THEN 4 WHEN col3 > 50 THEN '5' ELSE 0 END) FROM a", "while converting CASE WHEN"
+ },
};
}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/SerDeUtilsTest.java
similarity index 82%
rename from pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java
rename to pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/SerDeUtilsTest.java
index 7bd9fcb6ad..111c83bd66 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/SerDeUtilsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
import java.lang.reflect.Field;
import java.util.List;
@@ -35,13 +35,13 @@ public class SerDeUtilsTest extends QueryEnvironmentTestBase {
public void testQueryStagePlanSerDe(String query)
throws Exception {
QueryPlan queryPlan = _queryEnvironment.planQuery(query);
- for (StageNode stageNode : queryPlan.getQueryStageMap().values()) {
- Plan.StageNode serializedStageNode = StageNodeSerDeUtils.serializeStageNode((AbstractStageNode) stageNode);
- StageNode deserializedStageNode = StageNodeSerDeUtils.deserializeStageNode(serializedStageNode);
- Assert.assertTrue(isObjectEqual(stageNode, deserializedStageNode));
- Assert.assertEquals(deserializedStageNode.getStageId(), stageNode.getStageId());
- Assert.assertEquals(deserializedStageNode.getDataSchema(), stageNode.getDataSchema());
- Assert.assertEquals(deserializedStageNode.getInputs().size(), stageNode.getInputs().size());
+ for (PlanNode planNode : queryPlan.getQueryStageMap().values()) {
+ Plan.StageNode serializedStageNode = StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) planNode);
+ PlanNode deserializedPlanNode = StageNodeSerDeUtils.deserializeStageNode(serializedStageNode);
+ Assert.assertTrue(isObjectEqual(planNode, deserializedPlanNode));
+ Assert.assertEquals(deserializedPlanNode.getPlanFragmentId(), planNode.getPlanFragmentId());
+ Assert.assertEquals(deserializedPlanNode.getDataSchema(), planNode.getDataSchema());
+ Assert.assertEquals(deserializedPlanNode.getInputs().size(), planNode.getInputs().size());
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index b1e6bc8c58..6f1e0618c7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -40,9 +40,9 @@ import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -170,9 +170,9 @@ public class QueryRunner {
if (isLeafStage(distributedStagePlan)) {
runLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs, requestId);
} else {
- StageNode stageRoot = distributedStagePlan.getStageRoot();
+ PlanNode stageRoot = distributedStagePlan.getStageRoot();
OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
- new PlanRequestContext(_mailboxService, requestId, stageRoot.getStageId(), timeoutMs, deadlineMs,
+ new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
_scheduler.register(rootOperator);
}
@@ -222,7 +222,7 @@ public class QueryRunner {
+ (System.currentTimeMillis() - leafStageStartMillis) + " ms");
MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(_mailboxService, requestId, sendNode.getStageId(),
+ new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
isTraceEnabled);
MultiStageOperator leafStageOperator =
@@ -247,9 +247,9 @@ public class QueryRunner {
private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore,
MailboxService mailboxService, long deadlineMs) {
- StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
+ PlanFragmentMetadata planFragmentMetadata = distributedStagePlan.getStageMetadata();
WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
- String rawTableName = StageMetadata.getTableName(stageMetadata);
+ String rawTableName = PlanFragmentMetadata.getTableName(planFragmentMetadata);
Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
List<ServerPlanRequestContext> requests = new ArrayList<>();
for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
@@ -263,7 +263,7 @@ public class QueryRunner {
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
- tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE,
+ tableConfig, schema, PlanFragmentMetadata.getTimeBoundary(planFragmentMetadata), TableType.OFFLINE,
tableEntry.getValue(), deadlineMs));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
@@ -271,7 +271,7 @@ public class QueryRunner {
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
- tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME,
+ tableConfig, schema, PlanFragmentMetadata.getTimeBoundary(planFragmentMetadata), TableType.REALTIME,
tableEntry.getValue(), deadlineMs));
} else {
throw new IllegalArgumentException("Unsupported table type key: " + tableType);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 4d60969b48..61798f3dc7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 845bec418a..6b29cdc82a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.utils.AggregationUtils;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index 341c0b5969..62cb7c3d45 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -67,10 +67,10 @@ public class OperatorUtils {
return functionName;
}
- public static void recordTableName(OperatorStats operatorStats, StageMetadata stageMetadata) {
- if (StageMetadata.getTableName(stageMetadata) != null) {
+ public static void recordTableName(OperatorStats operatorStats, PlanFragmentMetadata planFragmentMetadata) {
+ if (PlanFragmentMetadata.getTableName(planFragmentMetadata) != null) {
operatorStats.recordSingleStat(DataTable.MetadataKey.TABLE.getName(),
- StageMetadata.getTableName(stageMetadata));
+ PlanFragmentMetadata.getTableName(planFragmentMetadata));
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index b00aefab9d..da987d7b9b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -18,8 +18,8 @@
*/
package org.apache.pinot.query.runtime.plan;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -33,19 +33,19 @@ import org.apache.pinot.query.routing.WorkerMetadata;
public class DistributedStagePlan {
private int _stageId;
private VirtualServerAddress _server;
- private StageNode _stageRoot;
- private StageMetadata _stageMetadata;
+ private PlanNode _stageRoot;
+ private PlanFragmentMetadata _planFragmentMetadata;
public DistributedStagePlan(int stageId) {
_stageId = stageId;
}
- public DistributedStagePlan(int stageId, VirtualServerAddress server, StageNode stageRoot,
- StageMetadata stageMetadata) {
+ public DistributedStagePlan(int stageId, VirtualServerAddress server, PlanNode stageRoot,
+ PlanFragmentMetadata planFragmentMetadata) {
_stageId = stageId;
_server = server;
_stageRoot = stageRoot;
- _stageMetadata = stageMetadata;
+ _planFragmentMetadata = planFragmentMetadata;
}
public int getStageId() {
@@ -56,27 +56,27 @@ public class DistributedStagePlan {
return _server;
}
- public StageNode getStageRoot() {
+ public PlanNode getStageRoot() {
return _stageRoot;
}
- public StageMetadata getStageMetadata() {
- return _stageMetadata;
+ public PlanFragmentMetadata getStageMetadata() {
+ return _planFragmentMetadata;
}
public void setServer(VirtualServerAddress serverAddress) {
_server = serverAddress;
}
- public void setStageRoot(StageNode stageRoot) {
+ public void setStageRoot(PlanNode stageRoot) {
_stageRoot = stageRoot;
}
- public void setStageMetadata(StageMetadata stageMetadata) {
- _stageMetadata = stageMetadata;
+ public void setStageMetadata(PlanFragmentMetadata planFragmentMetadata) {
+ _planFragmentMetadata = planFragmentMetadata;
}
public WorkerMetadata getCurrentWorkerMetadata() {
- return _stageMetadata.getWorkerMetadataList().get(_server.workerId());
+ return _planFragmentMetadata.getWorkerMetadataList().get(_server.workerId());
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 11e8107996..0fd764c00e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -19,7 +19,7 @@
package org.apache.pinot.query.runtime.plan;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.operator.OpChainStats;
@@ -37,13 +37,13 @@ public class OpChainExecutionContext {
private final VirtualServerAddress _server;
private final long _timeoutMs;
private final long _deadlineMs;
- private final StageMetadata _stageMetadata;
+ private final PlanFragmentMetadata _planFragmentMetadata;
private final OpChainId _id;
private final OpChainStats _stats;
private final boolean _traceEnabled;
public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
- VirtualServerAddress server, long timeoutMs, long deadlineMs, StageMetadata stageMetadata,
+ VirtualServerAddress server, long timeoutMs, long deadlineMs, PlanFragmentMetadata planFragmentMetadata,
boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
@@ -51,7 +51,7 @@ public class OpChainExecutionContext {
_server = server;
_timeoutMs = timeoutMs;
_deadlineMs = deadlineMs;
- _stageMetadata = stageMetadata;
+ _planFragmentMetadata = planFragmentMetadata;
_id = new OpChainId(requestId, server.workerId(), stageId);
_stats = new OpChainStats(_id.toString());
_traceEnabled = traceEnabled;
@@ -87,8 +87,8 @@ public class OpChainExecutionContext {
return _deadlineMs;
}
- public StageMetadata getStageMetadata() {
- return _stageMetadata;
+ public PlanFragmentMetadata getStageMetadata() {
+ return _planFragmentMetadata;
}
public OpChainId getId() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 27c4302197..f8dfe58e26 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -20,20 +20,20 @@ package org.apache.pinot.query.runtime.plan;
import java.util.ArrayList;
import java.util.List;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
@@ -52,17 +52,17 @@ import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
/**
- * This visitor constructs a physical plan of operators from a {@link StageNode} tree. Note that
+ * This visitor constructs a physical plan of operators from a {@link PlanNode} tree. Note that
* this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into
* v1 operators at this point in time.
*
- * <p>This class should be used statically via {@link #build(StageNode, PlanRequestContext)}
+ * <p>This class should be used statically via {@link #build(PlanNode, PlanRequestContext)}
*/
-public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, PlanRequestContext> {
+public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PlanRequestContext> {
private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor();
- public static OpChain build(StageNode node, PlanRequestContext context) {
+ public static OpChain build(PlanNode node, PlanRequestContext context) {
MultiStageOperator root = node.visit(INSTANCE, context);
return new OpChain(context.getOpChainExecutionContext(), root, context.getReceivingMailboxIds());
}
@@ -112,7 +112,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitSetOp(SetOpNode setOpNode, PlanRequestContext context) {
List<MultiStageOperator> inputs = new ArrayList<>();
- for (StageNode input : setOpNode.getInputs()) {
+ for (PlanNode input : setOpNode.getInputs()) {
MultiStageOperator visited = input.visit(this, context);
inputs.add(visited);
}
@@ -145,8 +145,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext context) {
- StageNode left = node.getInputs().get(0);
- StageNode right = node.getInputs().get(1);
+ PlanNode left = node.getInputs().get(0);
+ PlanNode right = node.getInputs().get(1);
MultiStageOperator leftOperator = left.visit(this, context);
MultiStageOperator rightOperator = right.visit(this, context);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index d3d890d9d5..4383fa65c1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -21,7 +21,7 @@ package org.apache.pinot.query.runtime.plan;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -33,20 +33,20 @@ public class PlanRequestContext {
private final long _timeoutMs;
private final long _deadlineMs;
protected final VirtualServerAddress _server;
- protected final StageMetadata _stageMetadata;
+ protected final PlanFragmentMetadata _planFragmentMetadata;
protected final List<String> _receivingMailboxIds = new ArrayList<>();
private final OpChainExecutionContext _opChainExecutionContext;
private final boolean _traceEnabled;
public PlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, long deadlineMs,
- VirtualServerAddress server, StageMetadata stageMetadata, boolean traceEnabled) {
+ VirtualServerAddress server, PlanFragmentMetadata planFragmentMetadata, boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
_timeoutMs = timeoutMs;
_deadlineMs = deadlineMs;
_server = server;
- _stageMetadata = stageMetadata;
+ _planFragmentMetadata = planFragmentMetadata;
_traceEnabled = traceEnabled;
_opChainExecutionContext = new OpChainExecutionContext(this);
}
@@ -71,8 +71,8 @@ public class PlanRequestContext {
return _server;
}
- public StageMetadata getStageMetadata() {
- return _stageMetadata;
+ public PlanFragmentMetadata getStageMetadata() {
+ return _planFragmentMetadata;
}
public MailboxService getMailboxService() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 512e8717e5..d712e524ab 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -37,20 +37,20 @@ import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.parser.CalciteRexExpressionParser;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -76,7 +76,7 @@ import org.slf4j.LoggerFactory;
* As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
* filtering and other auxiliary functionalities.
*/
-public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPlanRequestContext> {
+public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
private static final Logger LOGGER = LoggerFactory.getLogger(ServerRequestPlanVisitor.class);
private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
@@ -162,7 +162,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
pinotQuery.setQueryOptions(queryOptions);
}
- private static void walkStageNode(StageNode node, ServerPlanRequestContext context) {
+ private static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
node.visit(INSTANCE, context);
}
@@ -264,8 +264,8 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
return null;
}
- private void visitChildren(StageNode node, ServerPlanRequestContext context) {
- for (StageNode child : node.getInputs()) {
+ private void visitChildren(PlanNode node, ServerPlanRequestContext context) {
+ for (PlanNode child : node.getInputs()) {
child.visit(this, context);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index ee2037ec2c..1844df632c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -25,10 +25,10 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.query.planner.stage.AbstractStageNode;
-import org.apache.pinot.query.planner.stage.StageNodeSerDeUtils;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
+import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -55,7 +55,7 @@ public class QueryPlanSerDeUtils {
return Worker.StagePlan.newBuilder()
.setStageId(distributedStagePlan.getStageId())
.setVirtualAddress(addressToProto(distributedStagePlan.getServer()))
- .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractStageNode) distributedStagePlan.getStageRoot()))
+ .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) distributedStagePlan.getStageRoot()))
.setStageMetadata(toProtoStageMetadata(distributedStagePlan.getStageMetadata())).build();
}
@@ -79,8 +79,8 @@ public class QueryPlanSerDeUtils {
return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port());
}
- private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
- StageMetadata.Builder builder = new StageMetadata.Builder();
+ private static PlanFragmentMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
+ PlanFragmentMetadata.Builder builder = new PlanFragmentMetadata.Builder();
List<WorkerMetadata> workerMetadataList = new ArrayList<>();
for (Worker.WorkerMetadata protoWorkerMetadata : protoStageMetadata.getWorkerMetadataList()) {
workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata));
@@ -119,12 +119,12 @@ public class QueryPlanSerDeUtils {
return mailboxMetadata;
}
- private static Worker.StageMetadata toProtoStageMetadata(StageMetadata stageMetadata) {
+ private static Worker.StageMetadata toProtoStageMetadata(PlanFragmentMetadata planFragmentMetadata) {
Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
- for (WorkerMetadata workerMetadata : stageMetadata.getWorkerMetadataList()) {
+ for (WorkerMetadata workerMetadata : planFragmentMetadata.getWorkerMetadataList()) {
builder.addWorkerMetadata(toProtoWorkerMetadata(workerMetadata));
}
- builder.putAllCustomProperty(stageMetadata.getCustomProperties());
+ builder.putAllCustomProperty(planFragmentMetadata.getCustomProperties());
return builder.build();
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 1c0f7168ff..b1ba1624b5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -22,7 +22,7 @@ import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.plan.PlanRequestContext;
import org.apache.pinot.spi.config.table.TableType;
@@ -40,9 +40,9 @@ public class ServerPlanRequestContext extends PlanRequestContext {
protected InstanceRequest _instanceRequest;
public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
- long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, PinotQuery pinotQuery,
+ long deadlineMs, VirtualServerAddress server, PlanFragmentMetadata planFragmentMetadata, PinotQuery pinotQuery,
TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
- super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, traceEnabled);
+ super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, planFragmentMetadata, traceEnabled);
_pinotQuery = pinotQuery;
_tableType = tableType;
_timeBoundaryInfo = timeBoundaryInfo;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 3524c95693..5454031a9d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -45,12 +45,12 @@ import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.ExplainPlanStageVisitor;
+import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -96,7 +96,7 @@ public class QueryDispatcher {
traceEnabled);
} catch (Exception e) {
cancel(requestId, queryPlan);
- throw new RuntimeException("Error executing query: " + ExplainPlanStageVisitor.explain(queryPlan), e);
+ throw new RuntimeException("Error executing query: " + ExplainPlanPlanVisitor.explain(queryPlan), e);
}
}
@@ -231,8 +231,8 @@ public class QueryDispatcher {
rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
if (stageStatsAggregator != null) {
if (queryPlan != null) {
- StageMetadata stageMetadata = queryPlan.getStageMetadata(operatorStats.getStageId());
- OperatorUtils.recordTableName(operatorStats, stageMetadata);
+ PlanFragmentMetadata planFragmentMetadata = queryPlan.getStageMetadata(operatorStats.getStageId());
+ OperatorUtils.recordTableName(operatorStats, planFragmentMetadata);
}
stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 0d32e8f7bd..37465dc211 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 22d26fe4fb..55e43dae11 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -48,7 +48,7 @@ import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index c9a48f9ba9..cf691b2f09 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -29,7 +29,7 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 8c602da156..1764165b1f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -64,8 +64,8 @@ public class MailboxReceiveOperatorTest {
private ReceivingMailbox _mailbox1;
@Mock
private ReceivingMailbox _mailbox2;
- private StageMetadata _stageMetadataBoth;
- private StageMetadata _stageMetadata1;
+ private PlanFragmentMetadata _planFragmentMetadataBoth;
+ private PlanFragmentMetadata _planFragmentMetadata1;
@BeforeMethod
public void setUp() {
@@ -74,7 +74,7 @@ public class MailboxReceiveOperatorTest {
when(_mailboxService.getPort()).thenReturn(123);
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata.Builder()
+ _planFragmentMetadataBoth = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Stream.of(server1, server2).map(
s -> new WorkerMetadata.Builder()
.setVirtualServerAddress(s)
@@ -94,7 +94,7 @@ public class MailboxReceiveOperatorTest {
.collect(Collectors.toList()))
.build();
// sending stage is 0, receiving stage is 1
- _stageMetadata1 = new StageMetadata.Builder()
+ _planFragmentMetadata1 = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Stream.of(server1).map(
s -> new WorkerMetadata.Builder()
.setVirtualServerAddress(s)
@@ -120,13 +120,13 @@ public class MailboxReceiveOperatorTest {
public void shouldThrowSingletonNoMatchMailboxServer() {
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
- StageMetadata stageMetadata = new StageMetadata.Builder()
+ PlanFragmentMetadata planFragmentMetadata = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Stream.of(server1, server2).map(
s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
.build();
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- stageMetadata, false);
+ planFragmentMetadata, false);
//noinspection resource
new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
}
@@ -135,7 +135,7 @@ public class MailboxReceiveOperatorTest {
public void shouldThrowReceiveSingletonFromMultiMatchMailboxServer() {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
//noinspection resource
new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
}
@@ -157,7 +157,7 @@ public class MailboxReceiveOperatorTest {
// Short timeoutMs should result in timeout
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
Thread.sleep(100L);
TransferableBlock mailbox = receiveOp.nextBlock();
@@ -168,7 +168,7 @@ public class MailboxReceiveOperatorTest {
// Longer timeout or default timeout (10s) doesn't result in timeout
context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
- System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+ System.currentTimeMillis() + 10_000L, _planFragmentMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
Thread.sleep(100L);
TransferableBlock mailbox = receiveOp.nextBlock();
@@ -182,7 +182,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
assertTrue(receiveOp.nextBlock().isNoOpBlock());
}
@@ -195,7 +195,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -210,7 +210,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
assertEquals(actualRows.size(), 1);
@@ -228,7 +228,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
TransferableBlock block = receiveOp.nextBlock();
assertTrue(block.isErrorBlock());
@@ -247,7 +247,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -271,7 +271,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
// Receive first block from server1
@@ -297,7 +297,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
TransferableBlock block = receiveOp.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 5080ad8454..ec7c00d513 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -165,12 +165,12 @@ public class MailboxSendOperatorTest {
}
private MailboxSendOperator getMailboxSendOperator() {
- StageMetadata stageMetadata = new StageMetadata.Builder()
+ PlanFragmentMetadata planFragmentMetadata = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Collections.singletonList(
new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, Long.MAX_VALUE,
- stageMetadata, false);
+ planFragmentMetadata, false);
return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 6d03bb6354..3ec9fc71de 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -41,7 +41,7 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -82,13 +82,13 @@ public class OpChainTest {
private BlockExchange _exchange;
private VirtualServerAddress _serverAddress;
- private StageMetadata _receivingStageMetadata;
+ private PlanFragmentMetadata _receivingPlanFragmentMetadata;
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
_serverAddress = new VirtualServerAddress("localhost", 123, 0);
- _receivingStageMetadata = new StageMetadata.Builder()
+ _receivingPlanFragmentMetadata = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Stream.of(_serverAddress).map(
s -> new WorkerMetadata.Builder()
.setVirtualServerAddress(s)
@@ -197,7 +197,7 @@ public class OpChainTest {
int senderStageId = 1;
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+ System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, true);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -211,7 +211,7 @@ public class OpChainTest {
OpChainExecutionContext secondStageContext =
new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, 1000,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+ System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, true);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -237,7 +237,7 @@ public class OpChainTest {
int senderStageId = 1;
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+ System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, false);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -249,7 +249,7 @@ public class OpChainTest {
OpChainExecutionContext secondStageContext =
new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, 1000,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+ System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, false);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 1a65a8457f..c17739e789 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -35,7 +35,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -73,8 +73,8 @@ public class SortedMailboxReceiveOperatorTest {
@Mock
private ReceivingMailbox _mailbox2;
- private StageMetadata _stageMetadataBoth;
- private StageMetadata _stageMetadata1;
+ private PlanFragmentMetadata _planFragmentMetadataBoth;
+ private PlanFragmentMetadata _planFragmentMetadata1;
@BeforeMethod
public void setUp() {
@@ -83,7 +83,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getPort()).thenReturn(123);
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata.Builder()
+ _planFragmentMetadataBoth = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Stream.of(server1, server2).map(
s -> new WorkerMetadata.Builder()
.setVirtualServerAddress(s)
@@ -103,7 +103,7 @@ public class SortedMailboxReceiveOperatorTest {
.collect(Collectors.toList()))
.build();
// sending stage is 0, receiving stage is 1
- _stageMetadata1 = new StageMetadata.Builder()
+ _planFragmentMetadata1 = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Stream.of(server1).map(
s -> new WorkerMetadata.Builder()
.setVirtualServerAddress(s)
@@ -129,13 +129,13 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldThrowSingletonNoMatchMailboxServer() {
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
- StageMetadata stageMetadata = new StageMetadata.Builder()
+ PlanFragmentMetadata planFragmentMetadata = new PlanFragmentMetadata.Builder()
.setWorkerMetadataList(Stream.of(server1, server2).map(
s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
.build();
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- stageMetadata, false);
+ planFragmentMetadata, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, false, 1);
@@ -145,7 +145,7 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldThrowReceiveSingletonFromMultiMatchMailboxServer() {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, false, 1);
@@ -166,7 +166,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
Collections.emptyList(), false, 1);
@@ -179,7 +179,7 @@ public class SortedMailboxReceiveOperatorTest {
// Short timeoutMs should result in timeout
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
Thread.sleep(100L);
@@ -191,7 +191,7 @@ public class SortedMailboxReceiveOperatorTest {
// Longer timeout or default timeout (10s) doesn't result in timeout
context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
- System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+ System.currentTimeMillis() + 10_000L, _planFragmentMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
Thread.sleep(100L);
@@ -205,7 +205,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
assertTrue(receiveOp.nextBlock().isNoOpBlock());
@@ -218,7 +218,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -233,7 +233,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -251,7 +251,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ _planFragmentMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
TransferableBlock block = receiveOp.nextBlock();
@@ -270,7 +270,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
assertTrue(receiveOp.nextBlock().isNoOpBlock());
@@ -293,7 +293,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
TransferableBlock block = receiveOp.nextBlock();
@@ -318,7 +318,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row5, row2, row4, row1, row3));
@@ -349,7 +349,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ _planFragmentMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirection, false, 1)) {
assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row1, row2, row3, row5, row4));
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index 7b28eca581..817c7239d6 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index ee1d337166..ab7748850f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -37,9 +37,9 @@ import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
@@ -118,7 +118,7 @@ public class QueryServerTest extends QueryTestSet {
// submit the request for testing.
submitRequest(queryRequest);
- StageMetadata stageMetadata = queryPlan.getStageMetadata(stageId);
+ PlanFragmentMetadata planFragmentMetadata = queryPlan.getStageMetadata(stageId);
// ensure mock query runner received correctly deserialized payload.
QueryRunner mockRunner = _queryRunnerMap.get(
@@ -129,9 +129,9 @@ public class QueryServerTest extends QueryTestSet {
TestUtils.waitForCondition(aVoid -> {
try {
Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> {
- StageNode stageNode = queryPlan.getQueryStageMap().get(stageId);
- return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot())
- && isStageMetadataEqual(stageMetadata, distributedStagePlan.getStageMetadata());
+ PlanNode planNode = queryPlan.getQueryStageMap().get(stageId);
+ return isStageNodesEqual(planNode, distributedStagePlan.getStageRoot())
+ && isStageMetadataEqual(planFragmentMetadata, distributedStagePlan.getStageMetadata());
}), Mockito.argThat(requestMetadataMap ->
requestIdStr.equals(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID))));
return true;
@@ -146,12 +146,13 @@ public class QueryServerTest extends QueryTestSet {
}
}
- private boolean isStageMetadataEqual(StageMetadata expected, StageMetadata actual) {
- if (!EqualityUtils.isEqual(StageMetadata.getTableName(expected), StageMetadata.getTableName(actual))) {
+ private boolean isStageMetadataEqual(PlanFragmentMetadata expected, PlanFragmentMetadata actual) {
+ if (!EqualityUtils.isEqual(PlanFragmentMetadata.getTableName(expected),
+ PlanFragmentMetadata.getTableName(actual))) {
return false;
}
- TimeBoundaryInfo expectedTimeBoundaryInfo = StageMetadata.getTimeBoundary(expected);
- TimeBoundaryInfo actualTimeBoundaryInfo = StageMetadata.getTimeBoundary(actual);
+ TimeBoundaryInfo expectedTimeBoundaryInfo = PlanFragmentMetadata.getTimeBoundary(expected);
+ TimeBoundaryInfo actualTimeBoundaryInfo = PlanFragmentMetadata.getTimeBoundary(actual);
if (expectedTimeBoundaryInfo == null && actualTimeBoundaryInfo != null
|| expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo == null) {
return false;
@@ -184,15 +185,15 @@ public class QueryServerTest extends QueryTestSet {
WorkerMetadata.getTableSegmentsMap(actual));
}
- private static boolean isStageNodesEqual(StageNode left, StageNode right) {
+ private static boolean isStageNodesEqual(PlanNode left, PlanNode right) {
// This only checks the stage tree structure is correct. because the input/stageId fields are not
// part of the generic proto ser/de; which is tested in query planner.
- if (left.getStageId() != right.getStageId() || left.getClass() != right.getClass()
+ if (left.getPlanFragmentId() != right.getPlanFragmentId() || left.getClass() != right.getClass()
|| left.getInputs().size() != right.getInputs().size()) {
return false;
}
- left.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
- right.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
+ left.getInputs().sort(Comparator.comparingInt(PlanNode::getPlanFragmentId));
+ right.getInputs().sort(Comparator.comparingInt(PlanNode::getPlanFragmentId));
for (int i = 0; i < left.getInputs().size(); i++) {
if (!isStageNodesEqual(left.getInputs().get(i), right.getInputs().get(i))) {
return false;
@@ -221,8 +222,8 @@ public class QueryServerTest extends QueryTestSet {
int workerId = serverInstanceToWorkerIdMap.get(serverInstance).get(0);
return Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(
- QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId,
- new VirtualServerAddress(serverInstance, workerId))))
+ QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId,
+ new VirtualServerAddress(serverInstance, workerId))))
// the default configurations that must exist.
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index c7a1fbaf81..7e98f1a7ae 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -102,7 +102,7 @@ public class QueryDispatcherTest extends QueryTestSet {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
QueryDispatcher dispatcher = new QueryDispatcher();
int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan, 10_000L, new HashMap<>());
- Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId));
+ Assert.assertTrue(PlannerUtils.isRootPlanFragment(reducerStageId));
dispatcher.shutdown();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org