You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/14 14:43:29 UTC
[pinot] branch master updated: adding PinotLogicalExchange to add exchange type indicator (#10813)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bb4091f12f adding PinotLogicalExchange to add exchange type indicator (#10813)
bb4091f12f is described below
commit bb4091f12f380a739ae08619c18c7874223d1be4
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Wed Jun 14 07:43:20 2023 -0700
adding PinotLogicalExchange to add exchange type indicator (#10813)
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../calcite/rel/logical/PinotLogicalExchange.java | 94 +++++++
.../rel/logical/PinotLogicalSortExchange.java | 31 ++-
.../calcite/rel/logical/PinotRelExchangeType.java | 49 ++++
.../PinotAggregateExchangeNodeInsertRule.java | 12 +-
.../rel/rules/PinotJoinExchangeNodeInsertRule.java | 14 +-
.../rel/rules/PinotJoinToDynamicBroadcastRule.java | 22 +-
.../apache/calcite/rel/rules/PinotRuleUtils.java | 3 +-
.../rules/PinotSetOpExchangeNodeInsertRule.java | 4 +-
.../rules/PinotWindowExchangeNodeInsertRule.java | 19 +-
.../apache/pinot/query/planner/PlanFragment.java | 2 +-
.../planner/logical/PinotLogicalQueryPlanner.java | 7 +-
.../query/planner/logical/PlanFragmenter.java | 14 +-
.../planner/logical/RelToPlanNodeConverter.java | 13 +-
.../planner/logical/ShuffleRewriteVisitor.java | 4 +-
.../query/planner/logical/SubPlanFragmenter.java | 4 +-
.../colocated/GreedyShuffleRewriteVisitor.java | 8 +-
.../pinot/query/planner/plannode/ExchangeNode.java | 19 +-
.../query/planner/plannode/MailboxReceiveNode.java | 23 +-
.../query/planner/plannode/MailboxSendNode.java | 23 +-
.../apache/pinot/query/QueryCompilationTest.java | 16 +-
.../src/test/resources/queries/AggregatePlans.json | 16 +-
.../src/test/resources/queries/GroupByPlans.json | 34 +--
.../src/test/resources/queries/JoinPlans.json | 98 +++----
.../src/test/resources/queries/OrderByPlans.json | 8 +-
.../test/resources/queries/PinotHintablePlans.json | 34 +--
.../src/test/resources/queries/SetOpPlans.json | 38 +--
.../resources/queries/WindowFunctionPlans.json | 298 ++++++++++-----------
.../apache/pinot/query/runtime/QueryRunner.java | 2 +-
.../query/runtime/plan/PhysicalPlanVisitor.java | 6 +-
.../plan/pipeline/PipelineBreakerVisitor.java | 5 +-
30 files changed, 569 insertions(+), 351 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalExchange.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalExchange.java
new file mode 100644
index 0000000000..c08b7ae8ad
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalExchange.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Exchange;
+
+
+/**
+ * Pinot's implementation of {@link Exchange} which needs information about whether to exchange is
+ * done on a streaming or a pipeline-breaking fashion.
+ */
+public class PinotLogicalExchange extends Exchange {
+ private final PinotRelExchangeType _exchangeType;
+
+ private PinotLogicalExchange(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, RelDistribution distribution, PinotRelExchangeType exchangeType) {
+ super(cluster, traitSet, input, distribution);
+ _exchangeType = exchangeType;
+ assert traitSet.containsIfApplicable(Convention.NONE);
+ }
+
+
+ public static PinotLogicalExchange create(RelNode input,
+ RelDistribution distribution) {
+ return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType());
+ }
+
+ /**
+ * Creates a LogicalExchange.
+ *
+ * @param input Input relational expression
+ * @param distribution Distribution specification
+ * @param exchangeType RelExchangeType specification
+ */
+ public static PinotLogicalExchange create(RelNode input,
+ RelDistribution distribution, PinotRelExchangeType exchangeType) {
+ RelOptCluster cluster = input.getCluster();
+ distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
+ RelTraitSet traitSet =
+ input.getTraitSet().replace(Convention.NONE).replace(distribution);
+ return new PinotLogicalExchange(cluster, traitSet, input, distribution, exchangeType);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ @Override
+ public Exchange copy(RelTraitSet traitSet, RelNode newInput,
+ RelDistribution newDistribution) {
+ return new PinotLogicalExchange(getCluster(), traitSet, newInput,
+ newDistribution, _exchangeType);
+ }
+
+ @Override
+ public RelNode accept(RelShuttle shuttle) {
+ return shuttle.visit(this);
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ RelWriter relWriter = super.explainTerms(pw);
+ if (_exchangeType != PinotRelExchangeType.getDefaultExchangeType()) {
+ relWriter.item("relExchangeType", _exchangeType);
+ }
+ return relWriter;
+ }
+
+ public PinotRelExchangeType getExchangeType() {
+ return _exchangeType;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
index e019a680c1..ace06a6c43 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
@@ -44,11 +44,13 @@ public class PinotLogicalSortExchange extends SortExchange {
protected final boolean _isSortOnSender;
protected final boolean _isSortOnReceiver;
+ protected final PinotRelExchangeType _exchangeType;
private PinotLogicalSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
- RelNode input, RelDistribution distribution, RelCollation collation,
+ RelNode input, RelDistribution distribution, PinotRelExchangeType exchangeType, RelCollation collation,
boolean isSortOnSender, boolean isSortOnReceiver) {
super(cluster, traitSet, input, distribution, collation);
+ _exchangeType = exchangeType;
_isSortOnSender = isSortOnSender;
_isSortOnReceiver = isSortOnReceiver;
}
@@ -58,15 +60,27 @@ public class PinotLogicalSortExchange extends SortExchange {
*/
public PinotLogicalSortExchange(RelInput input) {
super(input);
+ _exchangeType = PinotRelExchangeType.STREAMING;
_isSortOnSender = false;
_isSortOnReceiver = true;
}
+ public static PinotLogicalSortExchange create(
+ RelNode input,
+ RelDistribution distribution,
+ RelCollation collation,
+ boolean isSortOnSender,
+ boolean isSortOnReceiver) {
+ return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType(), collation, isSortOnSender,
+ isSortOnReceiver);
+ }
+
/**
* Creates a PinotLogicalSortExchange.
*
* @param input Input relational expression
* @param distribution Distribution specification
+ * @param exchangeType Exchange type specification
* @param collation array of sort specifications
* @param isSortOnSender whether to sort on the sender
* @param isSortOnReceiver whether to sort on receiver
@@ -74,6 +88,7 @@ public class PinotLogicalSortExchange extends SortExchange {
public static PinotLogicalSortExchange create(
RelNode input,
RelDistribution distribution,
+ PinotRelExchangeType exchangeType,
RelCollation collation,
boolean isSortOnSender,
boolean isSortOnReceiver) {
@@ -82,7 +97,7 @@ public class PinotLogicalSortExchange extends SortExchange {
distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
RelTraitSet traitSet =
input.getTraitSet().replace(Convention.NONE).replace(distribution).replace(collation);
- return new PinotLogicalSortExchange(cluster, traitSet, input, distribution,
+ return new PinotLogicalSortExchange(cluster, traitSet, input, distribution, exchangeType,
collation, isSortOnSender, isSortOnReceiver);
}
@@ -92,14 +107,18 @@ public class PinotLogicalSortExchange extends SortExchange {
public SortExchange copy(RelTraitSet traitSet, RelNode newInput,
RelDistribution newDistribution, RelCollation newCollation) {
return new PinotLogicalSortExchange(this.getCluster(), traitSet, newInput,
- newDistribution, newCollation, _isSortOnSender, _isSortOnReceiver);
+ newDistribution, _exchangeType, newCollation, _isSortOnSender, _isSortOnReceiver);
}
@Override
public RelWriter explainTerms(RelWriter pw) {
- return super.explainTerms(pw)
+ RelWriter relWriter = super.explainTerms(pw)
.item("isSortOnSender", _isSortOnSender)
.item("isSortOnReceiver", _isSortOnReceiver);
+ if (_exchangeType != PinotRelExchangeType.getDefaultExchangeType()) {
+ relWriter.item("relExchangeType", _exchangeType);
+ }
+ return relWriter;
}
public boolean isSortOnSender() {
@@ -109,4 +128,8 @@ public class PinotLogicalSortExchange extends SortExchange {
public boolean isSortOnReceiver() {
return _isSortOnReceiver;
}
+
+ public PinotRelExchangeType getExchangeType() {
+ return _exchangeType;
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotRelExchangeType.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotRelExchangeType.java
new file mode 100644
index 0000000000..4a06e1d025
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotRelExchangeType.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+/** Type of exchange. */
+public enum PinotRelExchangeType {
+
+ /**
+ * A streaming exchange is one that data will be sent and received in a streaming fashion.
+ */
+ STREAMING,
+
+ /**
+ * A sub-plan exchange is one that multi-stage query plan will execute the sub-tree below the exchange first; then
+ * treat the result as a {@link org.apache.calcite.rex.RexLiteral}; and put back into the logical plan for further
+ * optimization from the {@link org.apache.calcite.plan.RelOptPlanner}.
+ *
+ * <p>This is useful when plan can be further optimized if a sub plan is executed first from the broker.</p>
+ */
+ SUB_PLAN,
+
+ /**
+ * A pipeline-breaker is one that data will be sent streamingly from sender, but receiver will not start execution
+ * until all data has been received successfully.
+ *
+ * <p>This is useful when logical plan is fixed, but physical plan can be further optimized on the server.</p>
+ */
+ PIPELINE_BREAKER;
+
+ public static PinotRelExchangeType getDefaultExchangeType() {
+ return STREAMING;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index e5fcb1a9ca..c0523f47ca 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -39,8 +39,8 @@ import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
@@ -134,11 +134,11 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
// 2. attach exchange.
List<Integer> groupSetIndices = ImmutableIntList.range(0, oldAggRel.getGroupCount());
- LogicalExchange exchange = null;
+ PinotLogicalExchange exchange = null;
if (groupSetIndices.size() == 0) {
- exchange = LogicalExchange.create(newLeafAgg, RelDistributions.hash(Collections.emptyList()));
+ exchange = PinotLogicalExchange.create(newLeafAgg, RelDistributions.hash(Collections.emptyList()));
} else {
- exchange = LogicalExchange.create(newLeafAgg, RelDistributions.hash(groupSetIndices));
+ exchange = PinotLogicalExchange.create(newLeafAgg, RelDistributions.hash(groupSetIndices));
}
// 3. attach intermediate agg stage.
@@ -146,7 +146,7 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
call.transformTo(newAggNode);
}
- private RelNode makeNewIntermediateAgg(RelOptRuleCall ruleCall, Aggregate oldAggRel, LogicalExchange exchange,
+ private RelNode makeNewIntermediateAgg(RelOptRuleCall ruleCall, Aggregate oldAggRel, PinotLogicalExchange exchange,
boolean isLeafStageAggregationPresent, List<Integer> argList, List<Integer> groupByList) {
// add the exchange as the input node to the relation builder.
@@ -257,7 +257,7 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
}
// 2. Create an exchange on top of the LogicalProject.
- LogicalExchange exchange = LogicalExchange.create(project, RelDistributions.hash(newAggGroupByColumns));
+ PinotLogicalExchange exchange = PinotLogicalExchange.create(project, RelDistributions.hash(newAggGroupByColumns));
// 3. Create an intermediate stage aggregation.
RelNode newAggNode =
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 77fce00ea0..3dcad6998f 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -27,8 +27,8 @@ import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
import org.apache.calcite.tools.RelBuilderFactory;
@@ -69,16 +69,16 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
if (isColocatedJoin) {
// join exchange are colocated, we should directly pass through via join key
- leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
- rightExchange = LogicalExchange.create(rightInput, RelDistributions.SINGLETON);
+ leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.SINGLETON);
+ rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.SINGLETON);
} else if (joinInfo.leftKeys.isEmpty()) {
// when there's no JOIN key, use broadcast.
- leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
- rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
+ leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+ rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
// when join key exists, use hash distribution.
- leftExchange = LogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
- rightExchange = LogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
+ leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
+ rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
}
RelNode newJoinNode =
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index 7fee810386..3512cc581d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -26,13 +26,15 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.zookeeper.common.StringUtils;
@@ -140,7 +142,7 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
: join.getLeft();
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
: join.getRight();
- return left instanceof LogicalExchange && right instanceof LogicalExchange
+ return left instanceof Exchange && right instanceof Exchange
&& PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
&& (join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
}
@@ -148,23 +150,25 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
@Override
public void onMatch(RelOptRuleCall call) {
Join join = call.rel(0);
- LogicalExchange left = (LogicalExchange) (join.getLeft() instanceof HepRelVertex
+ PinotLogicalExchange left = (PinotLogicalExchange) (join.getLeft() instanceof HepRelVertex
? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
- LogicalExchange right = (LogicalExchange) (join.getRight() instanceof HepRelVertex
+ PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
boolean isColocatedJoin = PinotHintStrategyTable.containsHintOption(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
- LogicalExchange dynamicBroadcastExchange = isColocatedJoin
- ? LogicalExchange.create(right.getInput(), RelDistributions.SINGLETON)
- : LogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED);
+ PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
+ ? PinotLogicalExchange.create(right.getInput(), RelDistributions.SINGLETON,
+ PinotRelExchangeType.PIPELINE_BREAKER)
+ : PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
+ PinotRelExchangeType.PIPELINE_BREAKER);
Join dynamicFilterJoin =
new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList()));
// adding pass-through exchange after join b/c currently leaf-stage doesn't support chaining operator(s) after JOIN
- LogicalExchange passThroughAfterJoinExchange =
- LogicalExchange.create(dynamicFilterJoin, RelDistributions.SINGLETON);
+ PinotLogicalExchange passThroughAfterJoinExchange =
+ PinotLogicalExchange.create(dynamicFilterJoin, RelDistributions.SINGLETON);
call.transformTo(passThroughAfterJoinExchange);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
index 5c52512def..530704750f 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
@@ -26,7 +26,6 @@ import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
@@ -71,7 +70,7 @@ public class PinotRuleUtils {
if (relNode instanceof HepRelVertex) {
relNode = ((HepRelVertex) relNode).getCurrentRel();
}
- if (relNode instanceof LogicalExchange) {
+ if (relNode instanceof Exchange) {
return false;
}
for (RelNode child : relNode.getInputs()) {
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
index 16227a1f83..9fe933aa4b 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
@@ -27,10 +27,10 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
import org.apache.calcite.tools.RelBuilderFactory;
@@ -69,7 +69,7 @@ public class PinotSetOpExchangeNodeInsertRule extends RelOptRule {
List<Integer> hashFields =
IntStream.range(0, setOp.getRowType().getFieldCount()).boxed().collect(Collectors.toCollection(ArrayList::new));
for (RelNode input : setOp.getInputs()) {
- RelNode exchange = LogicalExchange.create(input, RelDistributions.hash(hashFields));
+ RelNode exchange = PinotLogicalExchange.create(input, RelDistributions.hash(hashFields));
newInputs.add(exchange);
}
SetOp newSetOpNode;
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
index ae1ae851b8..da9a8d98ed 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
@@ -34,9 +34,9 @@ import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Window;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -92,7 +92,7 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
Window.Group windowGroup = window.groups.get(0);
if (windowGroup.keys.isEmpty() && windowGroup.orderKeys.getKeys().isEmpty()) {
// Empty OVER()
- // Add a single LogicalExchange for empty OVER() since no sort is required
+ // Add a single Exchange for empty OVER() since no sort is required
if (PinotRuleUtils.isProject(windowInput)) {
// Check for empty LogicalProject below LogicalWindow. If present modify it to be a Literal only project and add
@@ -105,7 +105,8 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
}
}
- LogicalExchange exchange = LogicalExchange.create(windowInput, RelDistributions.hash(Collections.emptyList()));
+ PinotLogicalExchange exchange = PinotLogicalExchange.create(windowInput,
+ RelDistributions.hash(Collections.emptyList()));
call.transformTo(
LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(), window.groups));
} else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) {
@@ -126,8 +127,8 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
if (isPartitionByOnly) {
// Only PARTITION BY or PARTITION BY and ORDER BY on the same key(s)
- // Add a LogicalExchange hashed on the partition by keys
- LogicalExchange exchange = LogicalExchange.create(windowInput,
+ // Add an Exchange hashed on the partition by keys
+ PinotLogicalExchange exchange = PinotLogicalExchange.create(windowInput,
RelDistributions.hash(windowGroup.keys.toList()));
call.transformTo(LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(),
window.groups));
@@ -217,7 +218,7 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
*
* This function modifies the empty LogicalProject below the LogicalWindow to add a literal and adds a LogicalProject
* above LogicalWindow to remove the additional literal column from being projected any further. This also handles
- * the addition of the LogicalExchange under the LogicalWindow.
+ * the addition of the Exchange under the LogicalWindow.
*
* TODO: Explore an option to handle empty LogicalProject by actually projecting empty rows for each entry. This way
* there will no longer be a need to add a literal to the empty LogicalProject, but just traverse the number of
@@ -240,9 +241,9 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
outputBuilder.addAll(window.getRowType().getFieldList());
// This scenario is only possible for empty OVER() which uses functions that have no arguments such as COUNT(*) or
- // ROW_NUMBER(). Add a LogicalExchange with empty hash distribution list
- LogicalExchange exchange =
- LogicalExchange.create(projectBelowWindow, RelDistributions.hash(Collections.emptyList()));
+ // ROW_NUMBER(). Add an Exchange with empty hash distribution list
+ PinotLogicalExchange exchange =
+ PinotLogicalExchange.create(projectBelowWindow, RelDistributions.hash(Collections.emptyList()));
Window newWindow = new LogicalWindow(window.getCluster(), window.getTraitSet(), exchange,
window.getConstants(), outputBuilder.build(), window.groups);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java
index b8dd4e1be6..ad0f846180 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java
@@ -24,7 +24,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
/**
* The {@code PlanFragment} is the logical sub query plan that should be scheduled together from the result of
- * {@link org.apache.pinot.query.planner.logical.PinotQueryFragmenter}.
+ * {@link org.apache.pinot.query.planner.logical.PlanFragmenter}.
*
*/
public class PlanFragment {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 795276b1e5..924a580fa5 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.PlanFragmentMetadata;
import org.apache.pinot.query.planner.QueryPlan;
@@ -92,12 +93,14 @@ public class PinotLogicalQueryPlanner {
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
PlanNode subPlanRootSenderNode =
new MailboxSendNode(subPlanRoot.getPlanFragmentId(), subPlanRoot.getDataSchema(),
- 0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
+ 0, RelDistribution.Type.RANDOM_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
+ false);
subPlanRootSenderNode.addInput(subPlanRoot);
PlanNode subPlanRootReceiverNode =
new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
- RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, subPlanRootSenderNode);
+ RelDistribution.Type.RANDOM_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
+ false, false, subPlanRootSenderNode);
subPlanRoot = subPlanRootReceiverNode;
PlanFragment planFragment1 = planFragmentContext._planFragmentIdToRootNodeMap.get(1);
planFragmentContext._planFragmentIdToRootNodeMap.put(1,
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index 1c4c90f958..8ea16e8828 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.PlanFragmentMetadata;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
@@ -132,19 +133,21 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context);
List<Integer> distributionKeys = node.getDistributionKeys();
- RelDistribution.Type exchangeType = node.getDistributionType();
+ RelDistribution.Type distributionType = node.getDistributionType();
+ PinotRelExchangeType exchangeType = node.getExchangeType();
// make an exchange sender and receiver node pair
// only HASH_DISTRIBUTED requires a partition key selector; so all other types (SINGLETON and BROADCAST)
// of exchange will not carry a partition key selector.
- KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+ KeySelector<Object[], Object[]> keySelector = distributionType == RelDistribution.Type.HASH_DISTRIBUTED
? new FieldSelectionKeySelector(distributionKeys) : null;
PlanNode mailboxSender =
new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(),
- currentPlanFragmentId, exchangeType, keySelector, node.getCollations(), node.isSortOnSender());
+ currentPlanFragmentId, distributionType, exchangeType, keySelector, node.getCollations(),
+ node.isSortOnSender());
PlanNode mailboxReceiver = new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(),
- nextPlanFragmentId, exchangeType, keySelector,
+ nextPlanFragmentId, distributionType, exchangeType, keySelector,
node.getCollations(), node.isSortOnSender(), node.isSortOnReceiver(), mailboxSender);
mailboxSender.addInput(nextPlanFragmentRoot);
@@ -159,8 +162,7 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
}
private boolean isPlanFragmentSplitter(PlanNode node) {
- // TODO: always return true for now, we will add more logic here later.
- return true;
+ return ((ExchangeNode) node).getExchangeType() != PinotRelExchangeType.SUB_PLAN;
}
public static class Context {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index 7254ef60dc..63da99df08 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -39,7 +39,9 @@ import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
@@ -108,20 +110,27 @@ public final class RelToPlanNodeConverter {
RelCollation collation = null;
boolean isSortOnSender = false;
boolean isSortOnReceiver = false;
+ PinotRelExchangeType exchangeType = PinotRelExchangeType.getDefaultExchangeType();
if (node instanceof SortExchange) {
collation = ((SortExchange) node).getCollation();
if (node instanceof PinotLogicalSortExchange) {
// These flags only take meaning if the collation is not null or empty
isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
isSortOnReceiver = ((PinotLogicalSortExchange) node).isSortOnReceiver();
+ exchangeType = ((PinotLogicalSortExchange) node).getExchangeType();
+ }
+ } else {
+ if (node instanceof PinotLogicalExchange) {
+ exchangeType = ((PinotLogicalExchange) node).getExchangeType();
}
}
List<RelFieldCollation> fieldCollations = (collation == null) ? null : collation.getFieldCollations();
// Compute all the tables involved under this exchange node
Set<String> tableNames = getTableNamesFromRelRoot(node);
- return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), tableNames, node.getDistribution(),
- fieldCollations, isSortOnSender, isSortOnReceiver);
+
+ return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), exchangeType,
+ tableNames, node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver);
}
private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
index 54d03da712..ba06f3f405 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -136,7 +136,7 @@ public class ShuffleRewriteVisitor implements PlanNodeVisitor<Set<Integer>, Void
KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
if (canSkipShuffle(oldPartitionKeys, selector)) {
- node.setExchangeType(RelDistribution.Type.SINGLETON);
+ node.setDistributionType(RelDistribution.Type.SINGLETON);
return oldPartitionKeys;
} else if (selector == null) {
return new HashSet<>();
@@ -151,7 +151,7 @@ public class ShuffleRewriteVisitor implements PlanNodeVisitor<Set<Integer>, Void
KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
if (canSkipShuffle(oldPartitionKeys, selector)) {
- node.setExchangeType(RelDistribution.Type.SINGLETON);
+ node.setDistributionType(RelDistribution.Type.SINGLETON);
return oldPartitionKeys;
} else {
// reset the context partitionKeys since we've determined that
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
index adb87d1ad8..938920ea57 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.SubPlanMetadata;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -138,8 +139,7 @@ public class SubPlanFragmenter implements PlanNodeVisitor<PlanNode, SubPlanFragm
}
private boolean isSubPlanSplitter(PlanNode node) {
- // TODO: implement this when we introduce a new type of exchange node for sub-plan splitter
- return false;
+ return ((ExchangeNode) node).getExchangeType() == PinotRelExchangeType.SUB_PLAN;
}
public static class Context {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 0057dc35ab..50f78e64bf 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -173,7 +173,7 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati
return new HashSet<>();
} else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getPlanFragmentId(),
node.getSenderStageId())) {
- node.setExchangeType(RelDistribution.Type.SINGLETON);
+ node.setDistributionType(RelDistribution.Type.SINGLETON);
_dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getServerInstanceToWorkerIdMap());
return oldColocationKeys;
@@ -187,10 +187,10 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati
}
// If the current stage is a join-stage then we already know whether shuffle can be skipped.
if (_canSkipShuffleForJoin) {
- node.setExchangeType(RelDistribution.Type.SINGLETON);
+ node.setDistributionType(RelDistribution.Type.SINGLETON);
// For the join-case, servers are already re-assigned in visitJoin. Moreover, we haven't yet changed sender's
// distribution.
- ((MailboxSendNode) node.getSender()).setExchangeType(RelDistribution.Type.SINGLETON);
+ ((MailboxSendNode) node.getSender()).setDistributionType(RelDistribution.Type.SINGLETON);
return oldColocationKeys;
} else if (selector == null) {
return new HashSet<>();
@@ -214,7 +214,7 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati
Set<ColocationKey> colocationKeys;
if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getPlanFragmentId())) {
// Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side.
- node.setExchangeType(RelDistribution.Type.SINGLETON);
+ node.setDistributionType(RelDistribution.Type.SINGLETON);
colocationKeys = oldColocationKeys;
} else {
colocationKeys = new HashSet<>();
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 46baec9be5..1d136efb20 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -33,7 +34,10 @@ import org.apache.pinot.query.planner.serde.ProtoProperties;
public class ExchangeNode extends AbstractPlanNode {
@ProtoProperties
- private RelDistribution.Type _exchangeType;
+ private PinotRelExchangeType _exchangeType;
+
+ @ProtoProperties
+ private RelDistribution.Type _distributionType;
@ProtoProperties
private List<Integer> _keys;
@@ -57,12 +61,13 @@ public class ExchangeNode extends AbstractPlanNode {
super(planFragmentId);
}
- public ExchangeNode(int currentStageId, DataSchema dataSchema, Set<String> tableNames, RelDistribution distribution,
- List<RelFieldCollation> collations, boolean isSortOnSender,
+ public ExchangeNode(int currentStageId, DataSchema dataSchema, PinotRelExchangeType exchangeType,
+ Set<String> tableNames, RelDistribution distribution, List<RelFieldCollation> collations, boolean isSortOnSender,
boolean isSortOnReceiver) {
super(currentStageId, dataSchema);
+ _exchangeType = exchangeType;
_keys = distribution.getKeys();
- _exchangeType = distribution.getType();
+ _distributionType = distribution.getType();
_isSortOnSender = isSortOnSender;
_isSortOnReceiver = isSortOnReceiver;
_collations = collations;
@@ -79,10 +84,14 @@ public class ExchangeNode extends AbstractPlanNode {
return visitor.visitExchange(this, context);
}
- public RelDistribution.Type getDistributionType() {
+ public PinotRelExchangeType getExchangeType() {
return _exchangeType;
}
+ public RelDistribution.Type getDistributionType() {
+ return _distributionType;
+ }
+
public List<Integer> getDistributionKeys() {
return _keys;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
index e67b8343ed..c2515edbb6 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -38,7 +39,9 @@ public class MailboxReceiveNode extends AbstractPlanNode {
@ProtoProperties
private int _senderStageId;
@ProtoProperties
- private RelDistribution.Type _exchangeType;
+ private RelDistribution.Type _distributionType;
+ @ProtoProperties
+ private PinotRelExchangeType _exchangeType;
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;
@ProtoProperties
@@ -61,11 +64,13 @@ public class MailboxReceiveNode extends AbstractPlanNode {
}
public MailboxReceiveNode(int planFragmentId, DataSchema dataSchema, int senderStageId,
- RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+ RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
+ @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
@Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
PlanNode sender) {
super(planFragmentId, dataSchema);
_senderStageId = senderStageId;
+ _distributionType = distributionType;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
if (!CollectionUtils.isEmpty(fieldCollations)) {
@@ -104,11 +109,19 @@ public class MailboxReceiveNode extends AbstractPlanNode {
return _senderStageId;
}
- public void setExchangeType(RelDistribution.Type exchangeType) {
+ public void setDistributionType(RelDistribution.Type distributionType) {
+ _distributionType = distributionType;
+ }
+
+ public RelDistribution.Type getDistributionType() {
+ return _distributionType;
+ }
+
+ public void setExchangeType(PinotRelExchangeType exchangeType) {
_exchangeType = exchangeType;
}
- public RelDistribution.Type getExchangeType() {
+ public PinotRelExchangeType getExchangeType() {
return _exchangeType;
}
@@ -142,7 +155,7 @@ public class MailboxReceiveNode extends AbstractPlanNode {
@Override
public String explain() {
- return "MAIL_RECEIVE(" + _exchangeType + ")";
+ return "MAIL_RECEIVE(" + _distributionType + ")";
}
@Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index 08624959b7..1edf2902df 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -25,6 +25,7 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -36,7 +37,9 @@ public class MailboxSendNode extends AbstractPlanNode {
@ProtoProperties
private int _receiverStageId;
@ProtoProperties
- private RelDistribution.Type _exchangeType;
+ private RelDistribution.Type _distributionType;
+ @ProtoProperties
+ private PinotRelExchangeType _exchangeType;
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;
@ProtoProperties
@@ -51,10 +54,12 @@ public class MailboxSendNode extends AbstractPlanNode {
}
public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId,
- RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+ RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
+ @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
@Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender) {
super(planFragmentId, dataSchema);
_receiverStageId = receiverStageId;
+ _distributionType = distributionType;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
// TODO: Support ordering here if the 'fieldCollations' aren't empty and 'sortOnSender' is true
@@ -81,11 +86,19 @@ public class MailboxSendNode extends AbstractPlanNode {
_receiverStageId = receiverStageId;
}
- public void setExchangeType(RelDistribution.Type exchangeType) {
+ public void setDistributionType(RelDistribution.Type distributionType) {
+ _distributionType = distributionType;
+ }
+
+ public RelDistribution.Type getDistributionType() {
+ return _distributionType;
+ }
+
+ public void setExchangeType(PinotRelExchangeType exchangeType) {
_exchangeType = exchangeType;
}
- public RelDistribution.Type getExchangeType() {
+ public PinotRelExchangeType getExchangeType() {
return _exchangeType;
}
@@ -107,7 +120,7 @@ public class MailboxSendNode extends AbstractPlanNode {
@Override
public String explain() {
- return "MAIL_SEND(" + _exchangeType + ")";
+ return "MAIL_SEND(" + _distributionType + ")";
}
@Override
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index c3e45746bf..0073455ed1 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -89,17 +89,17 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
// JOIN is exchanged with hash distribution (data shuffle)
MailboxReceiveNode left = (MailboxReceiveNode) node.getInputs().get(0);
MailboxReceiveNode right = (MailboxReceiveNode) node.getInputs().get(1);
- Assert.assertEquals(left.getExchangeType(), RelDistribution.Type.HASH_DISTRIBUTED);
- Assert.assertEquals(right.getExchangeType(), RelDistribution.Type.HASH_DISTRIBUTED);
+ Assert.assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
+ Assert.assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
break;
}
if (node instanceof AggregateNode && node.getInputs().get(0) instanceof MailboxReceiveNode) {
// AGG is exchanged with singleton since it has already been distributed by JOIN.
MailboxReceiveNode input = (MailboxReceiveNode) node.getInputs().get(0);
if (shouldRewrite) {
- Assert.assertEquals(input.getExchangeType(), RelDistribution.Type.SINGLETON);
+ Assert.assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
} else {
- Assert.assertNotEquals(input.getExchangeType(), RelDistribution.Type.SINGLETON);
+ Assert.assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
}
break;
}
@@ -330,18 +330,18 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
new Object[]{"EXPLAIN PLAN EXCLUDING ATTRIBUTES AS DOT FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
"Execution Plan\n"
+ "digraph {\n"
- + "\"LogicalExchange\\n\" -> \"LogicalAggregate\\n\" [label=\"0\"]\n"
- + "\"LogicalAggregate\\n\" -> \"LogicalExchange\\n\" [label=\"0\"]\n"
+ + "\"PinotLogicalExchange\\n\" -> \"LogicalAggregate\\n\" [label=\"0\"]\n"
+ + "\"LogicalAggregate\\n\" -> \"PinotLogicalExchange\\n\" [label=\"0\"]\n"
+ "\"LogicalTableScan\\n\" -> \"LogicalAggregate\\n\" [label=\"0\"]\n"
+ "}\n"},
new Object[]{"EXPLAIN PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
"Execution Plan\n"
+ "LogicalProject(col1=[$0], col3=[$2])\n"
+ " LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
- + " LogicalExchange(distribution=[hash[0]])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ " LogicalProject(col1=[$0])\n"
+ " LogicalTableScan(table=[[a]])\n"
- + " LogicalExchange(distribution=[hash[0]])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ " LogicalProject(col1=[$0], col3=[$2])\n"
+ " LogicalTableScan(table=[[b]])\n"
},
diff --git a/pinot-query-planner/src/test/resources/queries/AggregatePlans.json b/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
index 7f4dcf0f47..ffb95c703e 100644
--- a/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
@@ -8,7 +8,7 @@
"Execution Plan",
"\nLogicalProject(avg=[/(CASE(=($1, 0), null:DECIMAL(1000, 0), $0), $1)])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2], col4=[$3])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -23,7 +23,7 @@
"Execution Plan",
"\nLogicalProject(avg=[/(CASE(=($1, 0), null:DECIMAL(1000, 0), $0), $1)], sum=[CASE(=($1, 0), null:DECIMAL(1000, 0), $0)], max=[$2])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)], max=[MAX($2)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[COUNT()], max=[MAX($2)])",
"\n LogicalProject(col2=[$1], col3=[$2], col4=[$3])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -38,7 +38,7 @@
"Execution Plan",
"\nLogicalProject(avg=[/(CAST(CASE(=($1, 0), null:INTEGER, $0)):DOUBLE, $1)], count=[$1])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -53,7 +53,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[CASE(=($1, 0), null:INTEGER, $0)], EXPR$1=[$1])",
"\n LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)], agg#1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], EXPR$0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -66,7 +66,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[CASE(=($1, 0), null:INTEGER, $0)], EXPR$1=[$1])",
"\n LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)], agg#1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], EXPR$0=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -81,7 +81,7 @@
"Execution Plan",
"\nLogicalProject(sum=[CASE(=($1, 0), null:INTEGER, $0)], count=[$1])",
"\n LogicalAggregate(group=[{}], sum=[$SUM0($0)], agg#1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], sum=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -96,7 +96,7 @@
"Execution Plan",
"\nLogicalProject(avg=[/(CAST(CASE(=($1, 0), null:INTEGER, $0)):DOUBLE, $1)], count=[$1])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -111,7 +111,7 @@
"Execution Plan",
"\nLogicalProject(sum=[CASE(=($1, 0), null:INTEGER, $0)], count=[$1])",
"\n LogicalAggregate(group=[{}], sum=[$SUM0($0)], agg#1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], sum=[$SUM0($1)], agg#1=[COUNT()])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
diff --git a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
index 8ac6fdf7f0..c05cfba1b5 100644
--- a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
@@ -7,7 +7,7 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -20,7 +20,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[/(CAST($1):DOUBLE NOT NULL, $2)], EXPR$3=[$3], EXPR$4=[$4])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], agg#1=[$SUM0($2)], EXPR$3=[MAX($3)], EXPR$4=[MIN($4)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)], agg#1=[COUNT()], EXPR$3=[MAX($2)], EXPR$4=[MIN($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -32,7 +32,7 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -47,7 +47,7 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -62,7 +62,7 @@
"Execution Plan",
"\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0, 'a'))])",
@@ -78,7 +78,7 @@
"\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2])",
"\n LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], EXPR$2=[$SUM0($2)], agg#2=[MAX($3)], agg#3=[MIN($4)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -94,7 +94,7 @@
"\nLogicalProject(value1=[$0], count=[$1], SUM=[$2])",
"\n LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
"\n LogicalAggregate(group=[{0}], count=[$SUM0($1)], SUM=[$SUM0($2)], agg#2=[MAX($3)], agg#3=[MIN($4)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], count=[COUNT()], SUM=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -108,7 +108,7 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -121,7 +121,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -134,7 +134,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[/(CAST($1):DOUBLE NOT NULL, $2)], EXPR$3=[$3], EXPR$4=[$4])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], agg#1=[COUNT()], EXPR$3=[MAX($1)], EXPR$4=[MIN($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -146,7 +146,7 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -159,7 +159,7 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)], EXPR$2=[MAX($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -173,7 +173,7 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[COUNT()])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -187,7 +187,7 @@
"Execution Plan",
"\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -202,7 +202,7 @@
"\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2])",
"\n LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -217,7 +217,7 @@
"\nLogicalProject(col1=[$0], EXPR$1=[$1])",
"\n LogicalFilter(condition=[AND(>=($2, 0), <($3, 20), <=($1, 10), =(/(CAST($1):DOUBLE NOT NULL, $4), 5))])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)], agg#1=[MAX($2)], agg#2=[MIN($2)], agg#3=[COUNT()])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -232,7 +232,7 @@
"\nLogicalProject(value1=[$0], count=[$1], SUM=[$2])",
"\n LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
"\n LogicalAggregate(group=[{0}], count=[COUNT()], SUM=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index 2ccebd1668..ea5576e727 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -11,10 +11,10 @@
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$4])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -30,10 +30,10 @@
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$4])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -45,9 +45,9 @@
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($0, $6)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
@@ -58,10 +58,10 @@
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($0, $6)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
@@ -72,10 +72,10 @@
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[AND(=($0, $6), >($2, $7))], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
@@ -86,9 +86,9 @@
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
@@ -100,11 +100,11 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], ts=[$1], col3=[$3])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$4])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[<($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -118,14 +118,14 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[<($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -139,10 +139,10 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[dateTrunc('DAY', +($2, $5))])",
"\n LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], ts=[$4])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], ts=[$4])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -154,14 +154,14 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{1, 2}])",
"\n LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -173,14 +173,14 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{1, 2}])",
"\n LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -193,11 +193,11 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$2])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[OR(=($1, 'bar'), =($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalFilter(condition=[AND(<>($1, 'alice':VARCHAR(7)), <>($1, 'charlie':VARCHAR(7)))])",
"\n LogicalTableScan(table=[[b]])",
@@ -211,11 +211,11 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], bvalue2=[$2])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[OR(=($1, 'bar'), =($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalFilter(condition=[AND(<>($1, 'alice':VARCHAR(7)), <>($1, 'charlie':VARCHAR(7)))])",
"\n LogicalTableScan(table=[[b]])",
@@ -229,10 +229,10 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
"\n LogicalJoin(condition=[=($2, $5)], joinType=[semi])",
- "\n LogicalExchange(distribution=[hash[2]])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[2]])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
@@ -244,23 +244,23 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
"\n LogicalJoin(condition=[=($2, $4)], joinType=[semi])",
- "\n LogicalExchange(distribution=[hash[2]])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalJoin(condition=[=($2, $4)], joinType=[semi])",
- "\n LogicalExchange(distribution=[hash[2]])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalJoin(condition=[=($2, $4)], joinType=[semi])",
- "\n LogicalExchange(distribution=[hash[2]])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[=($1, 'test')])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[=($0, 'foo')])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[=($0, 'bar')])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[=($0, 'foobar')])",
"\n LogicalTableScan(table=[[b]])",
@@ -275,35 +275,35 @@
"\nLogicalProject(col1=[$0], col2=[$1])",
"\n LogicalFilter(condition=[IS NOT TRUE($8)])",
"\n LogicalJoin(condition=[=($6, $7)], joinType=[left])",
- "\n LogicalExchange(distribution=[hash[6]])",
+ "\n PinotLogicalExchange(distribution=[hash[6]])",
"\n LogicalProject(col1=[$0], col2=[$1], col30=[$3], $f1=[$4], col32=[$5], $f10=[$7], col34=[$2])",
"\n LogicalFilter(condition=[IS NOT TRUE($7)])",
"\n LogicalJoin(condition=[=($5, $6)], joinType=[left])",
- "\n LogicalExchange(distribution=[hash[5]])",
+ "\n PinotLogicalExchange(distribution=[hash[5]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col30=[$3], $f1=[$5], col32=[$2])",
"\n LogicalFilter(condition=[IS NOT TRUE($5)])",
"\n LogicalJoin(condition=[=($3, $4)], joinType=[left])",
- "\n LogicalExchange(distribution=[hash[3]])",
+ "\n PinotLogicalExchange(distribution=[hash[3]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col30=[$2])",
"\n LogicalFilter(condition=[=($1, 'test')])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, 'foo')])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, 'bar')])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, 'foobar')])",
@@ -318,12 +318,12 @@
"Execution Plan",
"\nLogicalProject(col1=[$0])",
"\n LogicalJoin(condition=[AND(=($0, $5), =($1, $6), >($3, $10))], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col10=[CAST($0):VARCHAR], col20=[CAST($1):VARCHAR], $f2=[CAST($2):INTEGER], EXPR$3=[*(0.5:DECIMAL(2, 1), $2)])",
"\n LogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[b]])",
"\n"
diff --git a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
index f0d1883d45..5219ae95e4 100644
--- a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
@@ -60,7 +60,7 @@
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -75,7 +75,7 @@
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -90,7 +90,7 @@
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -105,7 +105,7 @@
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
diff --git a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
index af10c27bd3..71b261357c 100644
--- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
@@ -7,10 +7,10 @@
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($0, $6)], joinType=[inner])",
- "\n LogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[single])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[single])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
@@ -23,11 +23,11 @@
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
- "\n LogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[single])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[single])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[<($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -39,11 +39,11 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',is_colocated_by_join_keys='false') */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"output": [
"Execution Plan",
- "\nLogicalExchange(distribution=[single])",
+ "\nPinotLogicalExchange(distribution=[single])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[broadcast])",
+ "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -56,9 +56,9 @@
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($0, $5)], joinType=[semi])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -70,11 +70,11 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"output": [
"Execution Plan",
- "\nLogicalExchange(distribution=[single])",
+ "\nPinotLogicalExchange(distribution=[single])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[single], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -87,11 +87,11 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[single])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[broadcast])",
+ "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -104,13 +104,13 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{1}], EXPR$1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[single])",
+ "\n PinotLogicalExchange(distribution=[single])",
"\n LogicalJoin(condition=[=($0, $3)], joinType=[semi])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[broadcast])",
+ "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 0)])",
"\n LogicalTableScan(table=[[b]])",
@@ -124,7 +124,7 @@
"Execution Plan",
"\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -139,7 +139,7 @@
"\nLogicalProject(col2=[$0], EXPR$1=[$1], EXPR$2=[$2], EXPR$3=[$3])",
"\n LogicalFilter(condition=[AND(>($1, 10), >=($4, 0), <($5, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($1)], EXPR$3=[$SUM0($2)], agg#3=[MAX($1)], agg#4=[MIN($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2], $f2=[CAST($0):DECIMAL(1000, 500) NOT NULL])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
"\n LogicalTableScan(table=[[a]])",
diff --git a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
index c131fecb49..1e0991e96a 100644
--- a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
@@ -7,10 +7,10 @@
"output": [
"Execution Plan",
"\nLogicalUnion(all=[true])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -22,15 +22,15 @@
"output": [
"Execution Plan",
"\nLogicalUnion(all=[true])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalUnion(all=[true])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[c]])",
"\n"
@@ -42,18 +42,18 @@
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 1}])",
"\n LogicalUnion(all=[true])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalUnion(all=[true])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[c]])",
"\n"
@@ -65,15 +65,15 @@
"output": [
"Execution Plan",
"\nLogicalIntersect(all=[false])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalIntersect(all=[false])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[c]])",
"\n"
@@ -85,15 +85,15 @@
"output": [
"Execution Plan",
"\nLogicalMinus(all=[false])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalMinus(all=[false])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[b]])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[c]])",
"\n"
diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index b93665800e..2f75b4b2ce 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -8,7 +8,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -21,7 +21,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(aggs [COUNT()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(winLiteral=[0])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -35,7 +35,7 @@
"\nLogicalProject(EXPR$0=[42], EXPR$1=[$0])",
"\n LogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(aggs [COUNT()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(winLiteral=[0])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -48,7 +48,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(winLiteral=[0])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -61,7 +61,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -73,7 +73,7 @@
"output": [
"Execution Plan",
"\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -87,7 +87,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -101,7 +101,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -117,7 +117,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col1=[$0], EXPR$1=[$3], col2=[$1])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -133,7 +133,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
"\n LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -149,7 +149,7 @@
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -165,7 +165,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[20])",
"\n LogicalProject(col1=[$0], EXPR$1=[$3], col2=[$1])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -178,7 +178,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -191,7 +191,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 10)])",
"\n LogicalTableScan(table=[[a]])",
@@ -204,7 +204,7 @@
"output": [
"Execution Plan",
"\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[>($2, 10)])",
"\n LogicalTableScan(table=[[a]])",
@@ -218,7 +218,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [MIN($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
"\n LogicalFilter(condition=[OR(=($0, 'bar'), =($0, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -232,9 +232,9 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(aggs [MIN($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{0}])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -247,9 +247,9 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [MIN($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -262,9 +262,9 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [MIN($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -277,10 +277,10 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -296,10 +296,10 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -315,10 +315,10 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[$2], col3=[$0])",
"\n LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -331,7 +331,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -344,7 +344,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -358,7 +358,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -372,7 +372,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), MIN($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -388,7 +388,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -404,7 +404,7 @@
"\n LogicalSort(fetch=[100])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -420,7 +420,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -433,7 +433,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), MAX($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -446,7 +446,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($1), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 100)])",
"\n LogicalTableScan(table=[[a]])",
@@ -460,7 +460,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(aggs [MIN($0), MAX($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$2], $1=[LENGTH(CONCAT($0, ' ', $1))])",
"\n LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -475,7 +475,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$0], EXPR$1=[$1], EXPR$2=[$1])",
"\n LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject($0=[LENGTH(CONCAT($0, ' ', $1))])",
"\n LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -489,9 +489,9 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [MIN($0), SUM($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{0}])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -504,9 +504,9 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(aggs [MIN($1), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -519,9 +519,9 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], $1=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(aggs [MIN($0), MAX($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -534,10 +534,10 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -553,10 +553,10 @@
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$2], col3=[$0])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash])",
+ "\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -569,7 +569,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -582,7 +582,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -596,7 +596,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -609,7 +609,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -622,7 +622,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$0], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -636,7 +636,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(partition {1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -650,7 +650,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MAX($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -666,7 +666,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($2)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -682,7 +682,7 @@
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -698,7 +698,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($2)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -714,7 +714,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -727,7 +727,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[SUBSTR($0, 0, 2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -740,7 +740,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$2], $1=[SUBSTR($0, 0, 2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -753,7 +753,7 @@
"Execution Plan",
"\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
"\n LogicalTableScan(table=[[a]])",
@@ -767,7 +767,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[CONCAT($0, '-', $1)])",
"\n LogicalFilter(condition=[OR(AND(<>($0, 'bar'), <>($0, 'foo')), >=($2, 42))])",
"\n LogicalTableScan(table=[[a]])",
@@ -781,7 +781,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -794,7 +794,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject($0=[CONCAT($0, '-', $1)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -807,9 +807,9 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -822,9 +822,9 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -836,10 +836,10 @@
"output": [
"Execution Plan",
"\nLogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0])",
"\n LogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -852,9 +852,9 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -867,10 +867,10 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -886,10 +886,10 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -902,7 +902,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MAX($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -916,7 +916,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MAX($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -929,7 +929,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -942,7 +942,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$0], avg=[/(CAST($2):DOUBLE NOT NULL, $3)], min=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -956,7 +956,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -972,7 +972,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), MAX($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -988,7 +988,7 @@
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1004,7 +1004,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), MAX($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1020,7 +1020,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), MAX($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1036,7 +1036,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1053,7 +1053,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0], $1=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(partition {0, 1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER(), ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1069,7 +1069,7 @@
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
"\n LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1085,7 +1085,7 @@
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
"\n LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1098,7 +1098,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), MAX($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[REVERSE($0)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1111,7 +1111,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[AND(>($2, 42), OR(=($0, 'chewbacca':VARCHAR(9)), =($0, 'vader':VARCHAR(9)), =($0, 'yoda':VARCHAR(9))))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1125,7 +1125,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($1), MAX($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, ' ', $1))])",
"\n LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1139,7 +1139,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$1=[$5])",
"\n LogicalWindow(window#0=[window(partition {2} aggs [SUM($1), COUNT($1), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[2]])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, '-', $1))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1152,9 +1152,9 @@
"Execution Plan",
"\nLogicalProject($0=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1167,9 +1167,9 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[$2], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($1), SUM($1), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0, 1}])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1182,9 +1182,9 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], $1=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($0), SUM($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1197,10 +1197,10 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0), MAX($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1216,10 +1216,10 @@
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4], col3=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0), MAX($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
"\n LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1700,7 +1700,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1714,7 +1714,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1728,7 +1728,7 @@
"Execution Plan",
"\nLogicalProject($0=[$1])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1741,7 +1741,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1754,7 +1754,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1767,7 +1767,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$0], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1780,7 +1780,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MAX($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1796,7 +1796,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MIN($2)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1812,7 +1812,7 @@
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1828,7 +1828,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MIN($2)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1844,7 +1844,7 @@
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1857,7 +1857,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[SUBSTR($0, 0, 2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1870,7 +1870,7 @@
"Execution Plan",
"\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1883,7 +1883,7 @@
"output": [
"Execution Plan",
"\nLogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1])",
"\n LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1897,7 +1897,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[CONCAT($0, '-', $1)])",
"\n LogicalFilter(condition=[OR(AND(<>($0, 'bar'), <>($0, 'foo')), >=($2, 42))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1911,7 +1911,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1924,7 +1924,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1 DESC] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1937,7 +1937,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1950,7 +1950,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1 ASC-nulls-first] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1963,7 +1963,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1 DESC-nulls-last] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1976,7 +1976,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MAX($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1990,7 +1990,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MAX($2), COUNT($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2003,7 +2003,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2016,7 +2016,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$0], avg=[/(CAST($2):DOUBLE NOT NULL, $3)], min=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2029,7 +2029,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2045,7 +2045,7 @@
"\n LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
"\n LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
"\n LogicalWindow(window#0=[window(partition {0, 1} order by [1, 0] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2061,7 +2061,7 @@
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2077,7 +2077,7 @@
"\n LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
"\n LogicalWindow(window#0=[window(partition {0, 1} order by [1, 0] aggs [SUM($2), COUNT($2)])])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2090,7 +2090,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), MAX($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[REVERSE($0)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2103,7 +2103,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalFilter(condition=[AND(>($2, 42), OR(=($0, 'chewbacca':VARCHAR(9)), =($0, 'vader':VARCHAR(9)), =($0, 'yoda':VARCHAR(9))))])",
"\n LogicalTableScan(table=[[a]])",
@@ -2117,7 +2117,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MIN($1), MAX($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, ' ', $1))])",
"\n LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -2131,7 +2131,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$1=[$5])",
"\n LogicalWindow(window#0=[window(partition {2} order by [2] aggs [SUM($1), COUNT($1), COUNT($0)])])",
- "\n LogicalExchange(distribution=[hash[2]])",
+ "\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, '-', $1))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2145,7 +2145,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$1], EXPR$1=[$1])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject($0=[REVERSE(CONCAT($0, '-', $1))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2158,7 +2158,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0 DESC] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2171,7 +2171,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0 ASC-nulls-first] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2184,7 +2184,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0 DESC-nulls-last] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2615,13 +2615,13 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], col10=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$1], col10=[$2])",
"\n LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[1]])",
+ "\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -2636,7 +2636,7 @@
"\n LogicalWindow(window#0=[window(order by [2 DESC, 0] aggs [SUM($1), COUNT($1)])])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}], EXPR$1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2651,7 +2651,7 @@
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col1=[$0], EXPR$1=[$2])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}], EXPR$1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2666,7 +2666,7 @@
"\n LogicalWindow(window#0=[window(partition {0} order by [2 DESC, 0] aggs [MAX($1)])])",
"\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2 DESC, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
- "\n LogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{0, 2}], EXPR$1=[COUNT()])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2711,11 +2711,11 @@
"\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[>($2, 100)])",
"\n LogicalTableScan(table=[[a]])",
- "\n LogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1])",
"\n LogicalFilter(condition=[OR(=($0, 'brandon sanderson':VARCHAR(17)), =($0, 'douglas adams':VARCHAR(17)))])",
"\n LogicalTableScan(table=[[b]])",
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6fa02a216a..914c6c2ed0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -220,7 +220,7 @@ public class QueryRunner {
new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
serverQueryRequests, sendNode.getDataSchema());
MailboxSendOperator mailboxSendOperator =
- new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(),
+ new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getDistributionType(),
sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
sendNode.isSortOnSender(), sendNode.getReceiverStageId());
return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 68fcd82f67..722790d867 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -71,14 +71,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PhysicalPlanContext context) {
if (node.isSortOnReceiver()) {
SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
- new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+ new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getDistributionType(),
node.getDataSchema(), node.getCollationKeys(), node.getCollationDirections(),
node.getCollationNullDirections(), node.isSortOnSender(), node.getSenderStageId());
context.addReceivingMailboxIds(sortedMailboxReceiveOperator.getMailboxIds());
return sortedMailboxReceiveOperator;
} else {
MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+ new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getDistributionType(),
node.getSenderStageId());
context.addReceivingMailboxIds(mailboxReceiveOperator.getMailboxIds());
return mailboxReceiveOperator;
@@ -88,7 +88,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
- return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(),
+ return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getDistributionType(),
node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
node.getReceiverStageId());
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
index 03ea0b2115..49add00a11 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.plan.pipeline;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
@@ -40,9 +41,7 @@ public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Voi
@Override
public Void visitMailboxReceive(MailboxReceiveNode node, PipelineBreakerContext context) {
process(node, context);
- // TODO: actually implement pipeline breaker attribute in PlanNode
- // currently all mailbox receive node from leaf stage is considered as pipeline breaker node.
- if (context.isLeafStage()) {
+ if (node.getExchangeType() == PinotRelExchangeType.PIPELINE_BREAKER) {
context.addPipelineBreaker(node);
}
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org