You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/17 16:48:40 UTC
(pinot) branch master updated: [multistage][feature] support RelDistribution trait planning (#11976)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 09da0eac74 [multistage][feature] support RelDistribution trait planning (#11976)
09da0eac74 is described below
commit 09da0eac748d15dee53449c517e83c862dc238b6
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Nov 17 08:48:34 2023 -0800
[multistage][feature] support RelDistribution trait planning (#11976)
* trait rule to attach rel-distribution to RelNode
* explicitly mark MailboxSendNode as partitioned in explain, makes it easy to spot
* add back 'is_colocated_by_join_keys' option
* adjust rules for partition worker/mailbox assignment
* enhanced physical explain now to partition info and inlined mailbox send/receive naming
* testing
* fix test physical plan that currently doesn't consist leaf boundary exchange
* add more runtime test to ensure the reuslts correct after the hinted hash/partition exchange works
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../apache/calcite/rel/hint/PinotHintOptions.java | 4 +
.../rel/rules/PinotJoinToDynamicBroadcastRule.java | 11 +-
.../rel/rules/PinotRelDistributionTraitRule.java | 177 +++++++++
.../org/apache/pinot/query/QueryEnvironment.java | 102 +++--
.../apache/pinot/query/context/PlannerContext.java | 13 +-
.../query/planner/PhysicalExplainPlanVisitor.java | 55 ++-
.../query/planner/logical/LogicalPlanner.java | 6 +-
.../planner/logical/PinotLogicalQueryPlanner.java | 2 +-
.../query/planner/logical/PlanFragmenter.java | 3 +-
.../planner/logical/RelToPlanNodeConverter.java | 7 +-
.../planner/physical/DispatchablePlanMetadata.java | 12 +-
.../planner/physical/DispatchablePlanVisitor.java | 3 +-
.../planner/physical/MailboxAssignmentVisitor.java | 3 +-
.../pinot/query/planner/plannode/ExchangeNode.java | 10 +-
.../query/planner/plannode/MailboxSendNode.java | 21 +-
.../apache/pinot/query/routing/WorkerManager.java | 4 +-
.../apache/pinot/query/QueryCompilationTest.java | 12 +-
.../resources/queries/ExplainPhysicalPlans.json | 414 ++++++++++++++++++---
.../src/test/resources/queries/QueryHints.json | 196 +++++++++-
19 files changed, 916 insertions(+), 139 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index ba92d43741..b97d3107bc 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -80,6 +80,10 @@ public class PinotHintOptions {
* BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
*/
public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";
+ /**
+ * Indicat that the join operator(s) within a certain selection scope are colocated
+ */
+ public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
}
public static class TableHintOptions {
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index e4cc9852c0..1e80585c7d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -155,8 +155,15 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
- PinotLogicalExchange dynamicBroadcastExchange =
- PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
+ // when colocated join hint is given, dynamic broadcast exchange can be hash-distributed b/c
+ // 1. currently, dynamic broadcast only works against main table off leaf-stage; (e.g. receive node on leaf)
+ // 2. when hash key are the same but hash functions are different, it can be done via normal hash shuffle.
+ boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
+ PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
+ PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
+ ? PinotLogicalExchange.create(right.getInput(), RelDistributions.hash(join.analyzeCondition().rightKeys),
+ PinotRelExchangeType.PIPELINE_BREAKER)
+ : PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
PinotRelExchangeType.PIPELINE_BREAKER);
Join dynamicFilterJoin =
new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRelDistributionTraitRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRelDistributionTraitRule.java
new file mode 100644
index 0000000000..71008e4fd9
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRelDistributionTraitRule.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.PinotHintStrategyTable;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+
+
+/**
+ * Special rule for Pinot, this rule populates {@link RelDistribution} across the entire relational tree.
+ *
+ * we implement this rule as a workaround b/c {@link org.apache.calcite.plan.RelTraitPropagationVisitor}, which is
+ * deprecated. The idea is to associate every node with a RelDistribution derived from {@link RelNode#getInputs()}
+ * or from the node itself (via hints, or special handling of the type of node in question).
+ */
+public class PinotRelDistributionTraitRule extends RelOptRule {
+ public static final PinotRelDistributionTraitRule INSTANCE =
+ new PinotRelDistributionTraitRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+ public PinotRelDistributionTraitRule(RelBuilderFactory factory) {
+ super(operand(RelNode.class, any()));
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ return call.rels.length >= 1;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ RelNode current = call.rel(0);
+ List<RelNode> inputs = current.getInputs();
+ RelDistribution relDistribution;
+
+ if (inputs == null || inputs.size() == 0) {
+ relDistribution = computeCurrentDistribution(current);
+ } else {
+ // if there's input to the current node, attempt to derive the RelDistribution.
+ relDistribution = deriveDistribution(current);
+ }
+ call.transformTo(attachTrait(current, relDistribution));
+ }
+
+ /**
+ * currently, Pinot has {@link RelTraitSet} default set to empty and thus we directly pull the cluster trait set,
+ * then plus the {@link RelDistribution} trait.
+ */
+ private static RelNode attachTrait(RelNode relNode, RelTrait trait) {
+ RelTraitSet clusterTraitSet = relNode.getCluster().traitSet();
+ if (relNode instanceof LogicalJoin) {
+ // work around {@link LogicalJoin#copy(RelTraitSet, RexNode, RelNode, RelNode, JoinRelType, boolean)} not copying
+ // properly
+ LogicalJoin join = (LogicalJoin) relNode;
+ return new LogicalJoin(join.getCluster(), clusterTraitSet.plus(trait), join.getLeft(),
+ join.getRight(), join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
+ ImmutableList.copyOf(join.getSystemFieldList()));
+ } else if (relNode instanceof LogicalTableScan) {
+ LogicalTableScan tableScan = (LogicalTableScan) relNode;
+ return new LogicalTableScan(tableScan.getCluster(), clusterTraitSet.plus(trait), tableScan.getTable());
+ } else {
+ return relNode.copy(clusterTraitSet.plus(trait), relNode.getInputs());
+ }
+ }
+
+ private static RelDistribution deriveDistribution(RelNode node) {
+ List<RelNode> inputs = node.getInputs();
+ RelNode input = PinotRuleUtils.unboxRel(inputs.get(0));
+ if (node instanceof PinotLogicalExchange) {
+ // TODO: derive from input first, only if the result is ANY we change it to current
+ return computeCurrentDistribution(node);
+ } else if (node instanceof LogicalProject) {
+ assert inputs.size() == 1;
+ RelDistribution inputRelDistribution = input.getTraitSet().getDistribution();
+ LogicalProject project = (LogicalProject) node;
+ try {
+ if (inputRelDistribution != null) {
+ return inputRelDistribution.apply(project.getMapping());
+ }
+ } catch (Exception e) {
+ // ... skip;
+ }
+ } else if (node instanceof LogicalFilter) {
+ assert inputs.size() == 1;
+ RelDistribution inputRelDistribution = input.getTraitSet().getDistribution();
+ if (inputRelDistribution != null) {
+ return inputRelDistribution;
+ }
+ } else if (node instanceof LogicalAggregate) {
+ assert inputs.size() == 1;
+ RelDistribution inputRelDistribution = inputs.get(0).getTraitSet().getDistribution();
+ if (inputRelDistribution != null) {
+ // create a mapping that only contains the group set
+ LogicalAggregate agg = (LogicalAggregate) node;
+ List<Integer> groupSetIndices = new ArrayList<>();
+ agg.getGroupSet().forEach(groupSetIndices::add);
+ return inputRelDistribution.apply(Mappings.target(groupSetIndices, input.getRowType().getFieldCount()));
+ }
+ } else if (node instanceof LogicalJoin) {
+ // TODO: we only map a single RelTrait from the LEFT table, later we should support RIGHT table as well
+ assert inputs.size() == 2;
+ RelDistribution inputRelDistribution = inputs.get(0).getTraitSet().getDistribution();
+ if (inputRelDistribution != null) {
+ // Since we only support LEFT RelTrait propagation, the inputRelDistribution can directly be applied
+ // b/c the Join node always puts left relation RowTypes then right relation RowTypes sequentially.
+ return inputRelDistribution;
+ }
+ }
+ // TODO: add the rest of the nodes.
+ return computeCurrentDistribution(node);
+ }
+
+ private static RelDistribution computeCurrentDistribution(RelNode node) {
+ if (node instanceof Exchange) {
+ return ((Exchange) node).getDistribution();
+ } else if (node instanceof LogicalTableScan) {
+ LogicalTableScan tableScan = (LogicalTableScan) node;
+ // convert table scan hints into rel trait
+ String partitionKey =
+ PinotHintStrategyTable.getHintOption(tableScan.getHints(), PinotHintOptions.TABLE_HINT_OPTIONS,
+ PinotHintOptions.TableHintOptions.PARTITION_KEY);
+ if (partitionKey != null) {
+ int partitionIndex = tableScan.getRowType().getField(partitionKey, true, true).getIndex();
+ return RelDistributions.hash(ImmutableList.of(partitionIndex));
+ } else {
+ return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY);
+ }
+ } else if (node instanceof LogicalAggregate) {
+ LogicalAggregate agg = (LogicalAggregate) node;
+ AggregateNode.AggType aggType = AggregateNode.AggType.valueOf(PinotHintStrategyTable.getHintOption(agg.getHints(),
+ PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_TYPE));
+ if (aggType == AggregateNode.AggType.FINAL || aggType == AggregateNode.AggType.DIRECT) {
+ List<Integer> groupSetIndices = new ArrayList<>();
+ agg.getGroupSet().forEach(groupSetIndices::add);
+ return RelDistributions.hash(groupSetIndices);
+ } else {
+ return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY);
+ }
+ }
+ return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY);
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 4ac42a4306..fc210ed26c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepMatchOrder;
@@ -42,6 +43,7 @@ import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.rules.PinotQueryRuleSets;
+import org.apache.calcite.rel.rules.PinotRelDistributionTraitRule;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.runtime.CalciteContextException;
@@ -89,7 +91,8 @@ public class QueryEnvironment {
private final Prepare.CatalogReader _catalogReader;
private final RelDataTypeFactory _typeFactory;
- private final HepProgram _hepProgram;
+ private final HepProgram _optProgram;
+ private final HepProgram _traitProgram;
// Pinot extensions
private final WorkerManager _workerManager;
@@ -121,38 +124,8 @@ public class QueryEnvironment {
.addRelBuilderConfigTransform(c -> c.withPushJoinCondition(true))
.addRelBuilderConfigTransform(c -> c.withAggregateUnique(true)))
.build();
-
- HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
- // Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
- // best to be explicit.
- hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST);
-
- // ----
- // Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
- // the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
- for (RelOptRule relOptRule : PinotQueryRuleSets.BASIC_RULES) {
- hepProgramBuilder.addRuleInstance(relOptRule);
- }
-
- // ----
- // Run Pinot rule to attach aggregation auxiliary info
- hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PINOT_AGG_PROCESS_RULES);
-
- // ----
- // Pushdown filters using a single HepInstruction.
- hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);
-
- // ----
- // Prune duplicate/unnecessary nodes using a single HepInstruction.
- // TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help.
- hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PRUNE_RULES);
-
- // ----
- // Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
- for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
- hepProgramBuilder.addRuleInstance(relOptRule);
- }
- _hepProgram = hepProgramBuilder.build();
+ _optProgram = getOptProgram();
+ _traitProgram = getTraitProgram();
}
/**
@@ -168,7 +141,8 @@ public class QueryEnvironment {
* @return QueryPlannerResult containing the dispatchable query plan and the relRoot.
*/
public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
- try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
+ try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram,
+ _traitProgram)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
// TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query.
@@ -196,7 +170,8 @@ public class QueryEnvironment {
* @return QueryPlannerResult containing the explained query plan and the relRoot.
*/
public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
- try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
+ try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram,
+ _traitProgram)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
@@ -229,7 +204,8 @@ public class QueryEnvironment {
}
public List<String> getTableNamesForQuery(String sqlQuery) {
- try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
+ try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram,
+ _traitProgram)) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
@@ -335,8 +311,12 @@ public class QueryEnvironment {
// 4. optimize relNode
// TODO: add support for traits, cost factory.
try {
- plannerContext.getRelOptPlanner().setRoot(relRoot.rel);
- return plannerContext.getRelOptPlanner().findBestExp();
+ RelOptPlanner optPlanner = plannerContext.getRelOptPlanner();
+ optPlanner.setRoot(relRoot.rel);
+ RelNode optimized = optPlanner.findBestExp();
+ RelOptPlanner traitPlanner = plannerContext.getRelTraitPlanner();
+ traitPlanner.setRoot(optimized);
+ return traitPlanner.findBestExp();
} catch (Exception e) {
throw new UnsupportedOperationException(
"Cannot generate a valid execution plan for the given query: " + RelOptUtil.toString(relRoot.rel), e);
@@ -364,4 +344,50 @@ public class QueryEnvironment {
private HintStrategyTable getHintStrategyTable() {
return PinotHintStrategyTable.PINOT_HINT_STRATEGY_TABLE;
}
+
+ private static HepProgram getOptProgram() {
+ HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
+ // Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
+ // best to be explicit.
+ hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST);
+
+ // ----
+ // Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
+ // the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
+ for (RelOptRule relOptRule : PinotQueryRuleSets.BASIC_RULES) {
+ hepProgramBuilder.addRuleInstance(relOptRule);
+ }
+
+ // ----
+ // Run Pinot rule to attach aggregation auxiliary info
+ hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PINOT_AGG_PROCESS_RULES);
+
+ // ----
+ // Pushdown filters using a single HepInstruction.
+ hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);
+
+ // ----
+ // Prune duplicate/unnecessary nodes using a single HepInstruction.
+ // TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help.
+ hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PRUNE_RULES);
+ return hepProgramBuilder.build();
+ }
+
+ private static HepProgram getTraitProgram() {
+ HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
+
+ // Set the match order as BOTTOM_UP.
+ hepProgramBuilder.addMatchOrder(HepMatchOrder.BOTTOM_UP);
+
+ // ----
+ // Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
+ for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
+ hepProgramBuilder.addRuleInstance(relOptRule);
+ }
+
+ // apply RelDistribution trait to all nodes
+ hepProgramBuilder.addRuleInstance(PinotRelDistributionTraitRule.INSTANCE);
+
+ return hepProgramBuilder.build();
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
index 564fc17468..69a2682afa 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
@@ -18,12 +18,14 @@
*/
package org.apache.pinot.query.context;
+import java.util.Collections;
import java.util.Map;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.prepare.PlannerImpl;
import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.FrameworkConfig;
@@ -43,14 +45,17 @@ public class PlannerContext implements AutoCloseable {
private final SqlValidator _validator;
private final RelOptPlanner _relOptPlanner;
+ private final LogicalPlanner _relTraitPlanner;
private Map<String, String> _options;
public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory,
- HepProgram hepProgram) {
+ HepProgram optProgram, HepProgram traitProgram) {
_planner = new PlannerImpl(config);
_validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory);
- _relOptPlanner = new LogicalPlanner(hepProgram, Contexts.EMPTY_CONTEXT);
+ _relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs());
+ _relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.EMPTY_CONTEXT,
+ Collections.singletonList(RelDistributionTraitDef.INSTANCE));
}
public PlannerImpl getPlanner() {
@@ -65,6 +70,10 @@ public class PlannerContext implements AutoCloseable {
return _relOptPlanner;
}
+ public LogicalPlanner getRelTraitPlanner() {
+ return _relTraitPlanner;
+ }
+
public void setOptions(Map<String, String> options) {
_options = options;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
index 245f531522..c51d24f5d3 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
@@ -18,9 +18,12 @@
*/
package org.apache.pinot.query.planner;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -37,6 +40,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.VirtualServerAddress;
/**
@@ -92,6 +96,15 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
.toString();
}
+ /**
+ * This wrapper prints out contextual info from {@link Context} before invoking {@link PlanNode#explain()}.
+ * The format of the contextual info is always:
+ * "`PREFIX`[`FRAGMENT_ID`]@`HOSTNAME`:`PORT`|[`WORKER_ID`(s)] `EXPLAIN`"
+ *
+ * @param node the {@link PlanNode} to be explained
+ * @param context the {@link Context} to be wrapped in front ot plan node explain.
+ * @return stringify format of the explained result wrapped with contextual info.
+ */
private StringBuilder appendInfo(PlanNode node, Context context) {
int planFragmentId = node.getPlanFragmentId();
context._builder
@@ -102,7 +115,9 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
.append(context._host.getHostname())
.append(':')
.append(context._host.getQueryServicePort())
- .append(' ')
+ .append("|[")
+ .append(context._workerId)
+ .append("] ")
.append(node.explain());
return context._builder;
}
@@ -185,16 +200,26 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
return node.getInputs().get(0).visit(this, context.next(false, context._host, context._workerId));
}
+ /**
+ * Print out mailbox sending info.
+ *
+ * Noted that when print out mailbox sending info. the receiving side follows the contextual info format defined in
+ * {@link PhysicalExplainPlanVisitor#appendInfo(PlanNode, Context)}.
+ *
+ * e.g. the RECEIVERs are printed as:
+ * "{[`FRAGMENT_ID`]@`HOSTNAME`:`PORT`|[`WORKER_ID`(s)]}" and are comma-separated.
+ */
private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {
appendInfo(node, context);
int receiverStageId = node.getReceiverStageId();
- Map<QueryServerInstance, List<Integer>> servers =
- _dispatchableSubPlan.getQueryStageList().get(receiverStageId)
- .getServerInstanceToWorkerIdMap();
+ List<VirtualServerAddress> serverAddressList =
+ _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId())
+ .getWorkerMetadataList().get(context._workerId)
+ .getMailBoxInfosMap().get(receiverStageId).getVirtualAddressList();
+ List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
context._builder.append("->");
- String receivers = servers.entrySet().stream()
- .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ String receivers = serverInstanceToWorkerIdList.stream()
.map(s -> "[" + receiverStageId + "]@" + s)
.collect(Collectors.joining(",", "{", "}"));
return context._builder.append(receivers);
@@ -252,7 +277,23 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
}
}
+ public static List<String> stringifyVirtualServerAddresses(List<VirtualServerAddress> serverAddressList) {
+ // using tree map to ensure print order.
+ Map<QueryServerInstance, List<Integer>> serverToWorkerIdMap = new TreeMap<>(
+ Comparator.comparing(QueryServerInstance::toString));
+ for (VirtualServerAddress serverAddress : serverAddressList) {
+ QueryServerInstance server = new QueryServerInstance(serverAddress.hostname(), serverAddress.port(), -1);
+ List<Integer> workerIds = serverToWorkerIdMap.getOrDefault(server, new ArrayList<>());
+ workerIds.add(serverAddress.workerId());
+ serverToWorkerIdMap.put(server, workerIds);
+ }
+ return serverToWorkerIdMap.entrySet().stream()
+ .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+ .collect(Collectors.toList());
+ }
+
public static String stringifyQueryServerInstanceToWorkerIdsEntry(Map.Entry<QueryServerInstance, List<Integer>> e) {
- return e.getKey() + "|" + e.getValue();
+ QueryServerInstance server = e.getKey();
+ return server.getHostname() + ":" + server.getQueryServicePort() + "|" + e.getValue();
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java
index 0e317560e9..e8dd96f7dd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java
@@ -35,8 +35,12 @@ public class LogicalPlanner extends HepPlanner {
private List<RelTraitDef> _traitDefs;
public LogicalPlanner(HepProgram program, Context context) {
+ this(program, context, new ArrayList<>());
+ }
+
+ public LogicalPlanner(HepProgram program, Context context, List<RelTraitDef> traitDefs) {
super(program, context);
- _traitDefs = new ArrayList();
+ _traitDefs = traitDefs;
}
@Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 07df7845b8..4b924316e7 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -100,7 +100,7 @@ public class PinotLogicalQueryPlanner {
PlanNode subPlanRootSenderNode =
new MailboxSendNode(subPlanRoot.getPlanFragmentId(), subPlanRoot.getDataSchema(), 0,
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
- false);
+ false, false);
subPlanRootSenderNode.addInput(subPlanRoot);
PlanFragment planFragment1 =
new PlanFragment(1, subPlanRootSenderNode, new PlanFragmentMetadata(), new ArrayList<>());
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index 9d7e5a2fdd..269db82f07 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -159,7 +159,8 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null;
MailboxSendNode mailboxSendNode =
new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId,
- distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender());
+ distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
+ node.isPartitioned());
mailboxSendNode.addInput(nextPlanFragmentRoot);
_planFragmentMap.put(senderPlanFragmentId,
new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>()));
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index b9e00711af..03d8ec957b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Exchange;
@@ -125,13 +126,17 @@ public final class RelToPlanNodeConverter {
exchangeType = ((PinotLogicalExchange) node).getExchangeType();
}
}
+ RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
+ boolean isPartitioned = inputDistributionTrait != null
+ && inputDistributionTrait.getType() == RelDistribution.Type.HASH_DISTRIBUTED
+ && inputDistributionTrait == node.getDistribution();
List<RelFieldCollation> fieldCollations = (collation == null) ? null : collation.getFieldCollations();
// Compute all the tables involved under this exchange node
Set<String> tableNames = getTableNamesFromRelRoot(node);
return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), exchangeType, tableNames,
- node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver);
+ node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver, isPartitioned);
}
private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 48378d4e90..fb57c2c3d1 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -68,8 +68,8 @@ public class DispatchablePlanMetadata implements Serializable {
// whether a stage requires singleton instance to execute, e.g. stage contains global reduce (sort/agg) operator.
private boolean _requiresSingletonInstance;
- // whether a stage is partitioned table scan
- private boolean _isPartitionedTableScan;
+ // whether a stage is partitioned by the same way the sending exchange is desired
+ private boolean _isPartitioned;
private int _partitionParallelism;
public DispatchablePlanMetadata() {
@@ -136,12 +136,12 @@ public class DispatchablePlanMetadata implements Serializable {
_requiresSingletonInstance = _requiresSingletonInstance || newRequireInstance;
}
- public boolean isPartitionedTableScan() {
- return _isPartitionedTableScan;
+ public boolean isPartitioned() {
+ return _isPartitioned;
}
- public void setPartitionedTableScan(boolean isPartitionedTableScan) {
- _isPartitionedTableScan = isPartitionedTableScan;
+ public void setPartitioned(boolean isPartitioned) {
+ _isPartitioned = isPartitioned;
}
public int getPartitionParallelism() {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index d177a1d892..1511ebaf52 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -104,7 +104,8 @@ public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, Dispatchab
@Override
public Void visitMailboxSend(MailboxSendNode node, DispatchablePlanContext context) {
node.getInputs().get(0).visit(this, context);
- getOrCreateDispatchablePlanMetadata(node, context);
+ DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
+ dispatchablePlanMetadata.setPartitioned(node.isPartitioned());
context.getDispatchablePlanStageRootMap().put(node.getPlanFragmentId(), node);
return null;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 519d4d14b3..6fb9c4e798 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -67,7 +67,8 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
}
- } else if (senderMetadata.isPartitionedTableScan() && (numReceivers / numSenders > 0)) {
+ } else if (senderMetadata.isPartitioned() && senderMetadata.getScannedTables().size() > 0
+ && (numReceivers / numSenders > 0)) {
// For partitioned table scan, send the data to the worker with the same worker id (not necessary the same
// instance). When partition parallelism is configured, send the data to the corresponding workers.
// NOTE: Do not use partitionParallelism from the metadata because it might be configured only in the first
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 1d136efb20..201e735c82 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -48,6 +48,9 @@ public class ExchangeNode extends AbstractPlanNode {
@ProtoProperties
private boolean _isSortOnReceiver = false;
+ @ProtoProperties
+ private boolean _isPartitioned = false;
+
@ProtoProperties
private List<RelFieldCollation> _collations;
@@ -63,13 +66,14 @@ public class ExchangeNode extends AbstractPlanNode {
public ExchangeNode(int currentStageId, DataSchema dataSchema, PinotRelExchangeType exchangeType,
Set<String> tableNames, RelDistribution distribution, List<RelFieldCollation> collations, boolean isSortOnSender,
- boolean isSortOnReceiver) {
+ boolean isSortOnReceiver, boolean isPartitioned) {
super(currentStageId, dataSchema);
_exchangeType = exchangeType;
_keys = distribution.getKeys();
_distributionType = distribution.getType();
_isSortOnSender = isSortOnSender;
_isSortOnReceiver = isSortOnReceiver;
+ _isPartitioned = isPartitioned;
_collations = collations;
_tableNames = tableNames;
}
@@ -104,6 +108,10 @@ public class ExchangeNode extends AbstractPlanNode {
return _isSortOnReceiver;
}
+ public boolean isPartitioned() {
+ return _isPartitioned;
+ }
+
public List<RelFieldCollation> getCollations() {
return _collations;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index 1834ac90c1..20f12b7316 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -47,6 +47,8 @@ public class MailboxSendNode extends AbstractPlanNode {
private List<RelFieldCollation.Direction> _collationDirections;
@ProtoProperties
private boolean _isSortOnSender;
+ @ProtoProperties
+ private boolean _isPartitioned;
public MailboxSendNode(int planFragmentId) {
super(planFragmentId);
@@ -55,7 +57,7 @@ public class MailboxSendNode extends AbstractPlanNode {
public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId,
RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
@Nullable List<Integer> distributionKeys, @Nullable List<RelFieldCollation> fieldCollations,
- boolean isSortOnSender) {
+ boolean isSortOnSender, boolean isPartitioned) {
super(planFragmentId, dataSchema);
_receiverStageId = receiverStageId;
_distributionType = distributionType;
@@ -75,6 +77,7 @@ public class MailboxSendNode extends AbstractPlanNode {
_collationDirections = Collections.emptyList();
}
_isSortOnSender = isSortOnSender;
+ _isPartitioned = isPartitioned;
}
public int getReceiverStageId() {
@@ -117,9 +120,23 @@ public class MailboxSendNode extends AbstractPlanNode {
return _isSortOnSender;
}
+ public boolean isPartitioned() {
+ return _isPartitioned;
+ }
+
@Override
public String explain() {
- return "MAIL_SEND(" + _distributionType + ")";
+ StringBuilder sb = new StringBuilder();
+ sb.append("MAIL_SEND(");
+ sb.append(_distributionType);
+ sb.append(')');
+ if (isPartitioned()) {
+ sb.append("[PARTITIONED]");
+ }
+ if (isSortOnSender()) {
+ sb.append("[SORTED]");
+ }
+ return sb.toString();
}
@Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 481cc7d593..5029321162 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -237,7 +237,7 @@ public class WorkerManager {
metadata.setWorkerIdToServerInstanceMap(workedIdToServerInstanceMap);
metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
- metadata.setPartitionedTableScan(true);
+ metadata.setPartitioned(metadata.isPartitioned());
metadata.setPartitionParallelism(partitionParallelism);
}
@@ -255,7 +255,7 @@ public class WorkerManager {
// worker in the first child.
if (!children.isEmpty()) {
DispatchablePlanMetadata firstChildMetadata = metadataMap.get(children.get(0).getFragmentId());
- if (firstChildMetadata.isPartitionedTableScan()) {
+ if (firstChildMetadata.isPartitioned() && firstChildMetadata.getScannedTables().size() > 0) {
int partitionParallelism = firstChildMetadata.getPartitionParallelism();
Map<Integer, QueryServerInstance> childWorkerIdToServerInstanceMap =
firstChildMetadata.getWorkerIdToServerInstanceMap();
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 6d59f8da22..28a386faa0 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -130,20 +130,20 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
.map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
- tableName.equals("a") ? ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]")
- : ImmutableList.of("localhost@{1,1}|[0]"));
+ tableName.equals("a") ? ImmutableList.of("localhost:1|[1]", "localhost:2|[0]")
+ : ImmutableList.of("localhost:1|[0]"));
} else if (!PlannerUtils.isRootPlanFragment(stageId)) {
// join stage should have both servers used.
Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
.map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
- ImmutableSet.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
+ ImmutableSet.of("localhost:1|[1]", "localhost:2|[0]"));
} else {
// reduce stage should have the reducer instance.
Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
.map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
- ImmutableSet.of("localhost@{3,3}|[0]"));
+ ImmutableSet.of("localhost:3|[0]"));
}
}
}
@@ -253,13 +253,13 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
.map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
- ImmutableList.of("localhost@{1,1}|[0]"));
+ ImmutableList.of("localhost:1|[0]"));
} else if (!PlannerUtils.isRootPlanFragment(stageId)) {
// join stage should have both servers used.
Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
.map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
.collect(Collectors.toSet()),
- ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
+ ImmutableList.of("localhost:1|[1]", "localhost:2|[0]"));
}
}
}
diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
index 9f9173a107..37b338da16 100644
--- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -5,74 +5,386 @@
"description": "explain plan with attributes",
"sql": "EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
"output": [
- "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
- "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
- "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
- " └── [1]@localhost:2 PROJECT\n",
- " └── [1]@localhost:2 TABLE SCAN (a) null\n"
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] PROJECT\n",
+ " └── [1]@localhost:2|[0] TABLE SCAN (a) null\n",
+ ""
]
},
{
"description": "explain plan without attributes",
"sql": "EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
"output": [
- "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
- "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
- "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
- " └── [1]@localhost:2 AGGREGATE_FINAL\n",
- " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
- " ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
- " └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
- " └── [2]@localhost:2 AGGREGATE_LEAF\n",
- " └── [2]@localhost:2 TABLE SCAN (a) null\n"
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:2|[0] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:2|[0] TABLE SCAN (a) null\n",
+ ""
]
},
{
"description": "explain plan with join",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
"output": [
- "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
- "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
- "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
- " └── [1]@localhost:2 PROJECT\n",
- " └── [1]@localhost:2 JOIN\n",
- " ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
- " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
- " │ └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
- " │ └── [2]@localhost:2 PROJECT\n",
- " │ └── [2]@localhost:2 TABLE SCAN (a) null\n",
- " └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
- " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
- " └── [3]@localhost:1 PROJECT\n",
- " └── [3]@localhost:1 TABLE SCAN (b) null\n"
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] PROJECT\n",
+ " └── [1]@localhost:2|[0] JOIN\n",
+ " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " │ └── [2]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " │ └── [2]@localhost:2|[0] PROJECT\n",
+ " │ └── [2]@localhost:2|[0] TABLE SCAN (a) null\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " └── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [3]@localhost:1|[0] PROJECT\n",
+ " └── [3]@localhost:1|[0] TABLE SCAN (b) null\n",
+ ""
]
},
{
- "description": "explain plan with join with colocated tables",
+ "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "explain plan with join with colocated tables, not on partition key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col1 = b.col2 WHERE b.col3 > 0",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] PROJECT\n",
+ " └── [1]@localhost:2|[0] JOIN\n",
+ " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " │ └── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [3]@localhost:1|[1] PROJECT\n",
+ " └── [3]@localhost:1|[1] FILTER\n",
+ " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with join with colocated tables, one-sided on partition key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col1 = b.col1 WHERE b.col3 > 0",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] PROJECT\n",
+ " └── [1]@localhost:2|[0] JOIN\n",
+ " ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " │ └── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [3]@localhost:1|[1] PROJECT\n",
+ " └── [3]@localhost:1|[1] FILTER\n",
+ " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with join with colocated tables, both-sided on partition key",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
"output": [
- "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
- "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
- "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
- "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
- "└── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
- " └── [1]@localhost:1 PROJECT\n",
- " └── [1]@localhost:1 JOIN\n",
- " ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
- " │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
- " │ ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
- " │ ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
- " │ └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
- " │ └── [2]@localhost:1 PROJECT\n",
- " │ └── [2]@localhost:1 TABLE SCAN (a) null\n",
- " └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
- " ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
- " ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
- " ├── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
- " └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
- " └── [3]@localhost:1 PROJECT\n",
- " └── [3]@localhost:1 FILTER\n",
- " └── [3]@localhost:1 TABLE SCAN (b) null\n"
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:1|[1] PROJECT\n",
+ " └── [1]@localhost:1|[1] JOIN\n",
+ " ├── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+ " │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+ " │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+ " │ └── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+ " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+ " └── [3]@localhost:1|[1] PROJECT\n",
+ " └── [3]@localhost:1|[1] FILTER\n",
+ " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with semi-join + group-by with colocated tables",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, COUNT(*) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN ( SELECT b.col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col1 = 'z' ) GROUP BY a.col1",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] JOIN\n",
+ " ├── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [2]@localhost:1|[1] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+ " └── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n",
+ " └── [3]@localhost:1|[1] PROJECT\n",
+ " └── [3]@localhost:1|[1] FILTER\n",
+ " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with semi-join + group-by with colocated tables with colocated hints",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ a.col1, COUNT(*) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN ( SELECT b.col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col1 = 'z' ) GROUP BY a.col1",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] JOIN\n",
+ " ├── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+ " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+ " └── [3]@localhost:1|[1] PROJECT\n",
+ " └── [3]@localhost:1|[1] FILTER\n",
+ " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with direct group-by with pre-partitioned tables on partition key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ GROUP BY a.col2",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:1|[1] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with direct group-by with pre-partitioned tables on non-partition key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ GROUP BY a.col1",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with direct group-by with pre-partitioned tables on both partition and non-partition key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ GROUP BY a.col1, a.col2",
+ "comments": "TODO: currently we do not support minimum hash-set thus we repartition from col2 to (col1, col2), but once we support this should not have repartitioning.",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with direct inner join + group-by with colocated tables on join key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0 GROUP BY a.col2",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] JOIN\n",
+ " ├── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+ " │ ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+ " │ ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+ " │ └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+ " │ └── [3]@localhost:1|[1] PROJECT\n",
+ " │ └── [3]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+ " └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+ " └── [4]@localhost:1|[1] PROJECT\n",
+ " └── [4]@localhost:1|[1] FILTER\n",
+ " └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with direct inner join + group-by with colocated tables on non-join key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0 GROUP BY a.col1",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] JOIN\n",
+ " ├── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " │ ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+ " │ ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+ " │ ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+ " │ └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+ " │ └── [3]@localhost:1|[1] PROJECT\n",
+ " │ └── [3]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+ " └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+ " └── [4]@localhost:1|[1] PROJECT\n",
+ " └── [4]@localhost:1|[1] FILTER\n",
+ " └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with semi join from grouped right table, then group-by with colocated tables on join key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN ( SELECT col2 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0 GROUP BY col2 HAVING COUNT(*) > 1 ) GROUP BY a.col2",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:1|[1] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] JOIN\n",
+ " ├── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [2]@localhost:1|[1] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ " ├── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+ " └── [3]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n",
+ " └── [3]@localhost:2|[0] FILTER\n",
+ " └── [3]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [3]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]}\n",
+ " └── [4]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [4]@localhost:1|[1] PROJECT\n",
+ " └── [4]@localhost:1|[1] FILTER\n",
+ " └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
+ ]
+ },
+ {
+ "description": "explain plan with semi join from grouped right table, then group-by with colocated tables on non-join key",
+ "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN ( SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0 GROUP BY col1 HAVING COUNT(*) > 1 ) GROUP BY a.col1",
+ "output": [
+ "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+ "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+ " └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+ " └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+ " └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+ " └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [2]@localhost:1|[1] JOIN\n",
+ " ├── [2]@localhost:1|[1] PROJECT\n",
+ " │ └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+ " └── [2]@localhost:1|[1] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+ " ├── [3]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+ " ├── [3]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+ " └── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n",
+ " └── [3]@localhost:1|[1] FILTER\n",
+ " └── [3]@localhost:1|[1] AGGREGATE_FINAL\n",
+ " └── [3]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+ " ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[2]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[3]} (Subtree Omitted)\n",
+ " ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[0]} (Subtree Omitted)\n",
+ " └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[1]}\n",
+ " └── [4]@localhost:1|[1] AGGREGATE_LEAF\n",
+ " └── [4]@localhost:1|[1] PROJECT\n",
+ " └── [4]@localhost:1|[1] FILTER\n",
+ " └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+ ""
]
}
]
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index cd4856a7e7..c05f3158df 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -120,6 +120,7 @@
]
},
"hint_option_queries_unmatched_partition": {
+ "comments": "this section specifically deal with unmatched partition tagging of table options",
"tables": {
"tbl1": {
"schema": [
@@ -297,35 +298,61 @@
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num"
},
{
- "description": "join with colocated tables",
+ "description": "join with pre-partitioned left and right tables",
"sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0"
},
+ {
+ "description": "join with pre-partitioned left and right tables; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0"
+ },
{
"description": "group by with pre-partitioned tables on partition column",
"sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
},
{
"description": "group by with pre-partitioned tables on non-partition column",
- "ignored": true,
- "comment": "tableOption usage implies direct exchange (e.g. without shuffle) this should be fix in the future",
"sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.name"
},
{
- "description": "join with colocated tables then group-by left join key column",
+ "description": "join with pre-partitioned left and right tables then group-by left join key column",
"sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
},
{
- "description": "join with colocated tables then group-by left join key column and other columns",
+ "description": "join with pre-partitioned left and right tables then group-by left join key column; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
+ },
+ {
+ "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns",
+ "sql": "SELECT {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
+ },
+ {
+ "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
+ },
+ {
+ "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns",
"sql": "SELECT {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
},
{
- "description": "join with colocated tables then group-by other columns",
+ "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
+ },
+ {
+ "description": "join with pre-partitioned left and right tables then group-by other columns",
"sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
},
{
- "description": "agg + semi-join on colocated tables then group by on partition column",
+ "description": "join with pre-partitioned left and right tables then group-by other columns; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
+ },
+ {
+ "description": "agg + semi-join on pre-partitioned main and side tables then group by on partition column",
"sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
},
+ {
+ "description": "agg + semi-join on pre-partitioned main and side tables then group by on partition column; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+ },
{
"description": "agg + semi-join on pre-partitioned main tables then group by on partition column",
"sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
@@ -335,40 +362,177 @@
"sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name"
},
{
- "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column on semi table",
+ "description": "agg + semi-join on pre-partitioned main table with group by on partitioned column on pre-partitioned semi table",
"sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0 GROUP BY num HAVING COUNT(*) > 1) GROUP BY {tbl1}.name"
},
{
- "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+ "description": "agg + semi-join on pre-partitioned main table with group by on partitioned column on pre-partitioned semi table; colocated hint",
+ "ignored": true,
+ "comments": "although both table are partitioned, but the partition function for agg-having semi table is not the same as partition function of the main table, and thus cannot use colocated hint; This is not detected currently and thus will produce incorrect results",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0 GROUP BY num HAVING COUNT(*) > 1) GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with having filter on top of semi join",
"sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
},
{
- "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join",
+ "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with having filter on top of semi join; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
+ },
+ {
+ "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with sorting on top of semi join",
"sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name ORDER BY SUM({tbl1}.val) DESC"
},
{
- "description": "co-partition agg + semi-join with colocated tables & agg hint",
+ "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with sorting on top of semi join; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name ORDER BY SUM({tbl1}.val) DESC"
+ },
+ {
+ "description": "partition agg + semi-join with pre-partitioned main and side tables & agg hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
},
{
- "description": "co-partition agg + semi-join with single table partition & agg hint",
+ "description": "partition agg + semi-join with pre-partitioned main and side table & agg hint and colocated hint",
+ "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+ },
+ {
+ "description": "partition agg + semi-join with single table partition & agg hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
},
{
- "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+ "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
},
{
- "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key",
+ "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join; colocated hint",
+ "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
+ },
+ {
+ "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
},
{
- "description": "semi-join on pre-partitioned main tables with sorting on top of semi join on join key",
+ "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key; colocated hint",
+ "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
+ },
+ {
+ "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join on join key",
"sql": "SELECT {tbl1}.num FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.num DESC"
},
{
- "description": "semi-join on pre-partitioned main tables with sorting on top of semi join with non-join columns",
+ "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join on join key; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.num DESC"
+ },
+ {
+ "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join with non-join columns",
"sql": "SELECT {tbl1}.name, {tbl1}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.val DESC"
+ },
+ {
+ "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join with non-join columns; colocated hint",
+ "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.name, {tbl1}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.val DESC"
+ }
+ ]
+ },
+ "hint_option_rel_trait_partition": {
+ "comments": "this section specifically test behavior of relational trait / partition inferences and aggressively",
+ "tables": {
+ "tbl1": {
+ "schema": [
+ {"name": "num", "type": "INT"},
+ {"name": "name", "type": "STRING"},
+ {"name": "val", "type": "INT"}
+ ],
+ "inputs": [
+ [1, "a", 23],
+ [2, "b", 34],
+ [3, "c", 45],
+ [3, "yyy", 56],
+ [4, "e", 12],
+ [4, "e", 23],
+ [6, "e", 34],
+ [7, "d", 45],
+ [7, "f", 56],
+ [8, "z", 67]
+ ],
+ "partitionColumns": [
+ "num"
+ ]
+ },
+ "tbl2": {
+ "schema": [
+ {"name": "num", "type": "INT"},
+ {"name": "id", "type": "STRING"},
+ {"name": "data", "type": "INT"}
+ ],
+ "inputs": [
+ [1, "xxx", 2],
+ [1, "xxx", 4],
+ [3, "yyy", 5],
+ [3, "zzz", 6],
+ [5, "zzz", 12],
+ [6, "e", 23],
+ [7, "d", 34],
+ [8, "z", 45]
+ ],
+ "partitionColumns": [
+ "num"
+ ]
+ },
+ "tbl3": {
+ "schema": [
+ {"name": "num", "type": "INT"},
+ {"name": "id", "type": "STRING"},
+ {"name": "data", "type": "INT"}
+ ],
+ "inputs": [
+ [1, "xxx", 12],
+ [2, "xxx", 23],
+ [3, "yyy", 34],
+ [4, "yyy", 45],
+ [5, "zzz", 12]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "colocated join, with hint, not on partition key",
+ "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.val = {tbl2}.data WHERE {tbl2}.data > 0"
+ },
+ {
+ "description": "colocated join, with hint, one-side on partition key",
+ "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.data WHERE {tbl2}.data > 0"
+ },
+ {
+ "description": "colocated join, with hint, both-side on partition key",
+ "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0"
+ },
+ {
+ "description": "colocated join then group by, not on partition key, group-by join key",
+ "sql": "SELECT {tbl1}.val, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.val = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl1}.val"
+ },
+ {
+ "description": "colocated join then group by, not on partition key, group-by non-join key",
+ "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.val = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
+ },
+ {
+ "description": "colocated join then group by, one-side on partition key, group-by join key",
+ "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
+ },
+ {
+ "description": "colocated join then group by, one-side on partition key, group-by non-join key",
+ "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
+ },
+ {
+ "description": "colocated join then group by, both-side on partition key, group-by left join key",
+ "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
+ },
+ {
+ "description": "colocated join then group by, both-side on partition key, group-by right join key",
+ "sql": "SELECT {tbl2}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.num"
+ },
+ {
+ "description": "colocated join then group by, both-side on partition key, group-by non-join key",
+ "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
}
]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org