You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by hy...@apache.org on 2020/07/14 20:25:52 UTC
[calcite] branch master updated: [CALCITE-3916] Support top-down
rule applying and upper bound space pruning
This is an automated email from the ASF dual-hosted git repository.
hyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 33aa61c [CALCITE-3916] Support top-down rule applying and upper bound space pruning
33aa61c is described below
commit 33aa61ca404018cc9fe8ad2ec2c02ba269c67ebe
Author: jinpeng.wjp <ji...@alibaba-inc.com>
AuthorDate: Tue May 26 17:49:41 2020 +0800
[CALCITE-3916] Support top-down rule applying and upper bound space pruning
Close #1991
---
.../calcite/interpreter/BindableConvention.java | 5 +
.../org/apache/calcite/plan/RelOptRuleOperand.java | 2 +-
.../calcite/plan/volcano/IterativeRuleDriver.java | 88 ++
.../{RuleQueue.java => IterativeRuleQueue.java} | 84 +-
.../apache/calcite/plan/volcano/OptimizeTask.java | 372 ---------
.../org/apache/calcite/plan/volcano/RelSet.java | 67 +-
.../org/apache/calcite/plan/volcano/RelSubset.java | 87 +-
.../apache/calcite/plan/volcano/RuleDriver.java | 53 ++
.../org/apache/calcite/plan/volcano/RuleQueue.java | 280 +------
.../calcite/plan/volcano/TopDownRuleDriver.java | 898 +++++++++++++++++++++
.../calcite/plan/volcano/TopDownRuleQueue.java | 88 ++
.../calcite/plan/volcano/VolcanoPlanner.java | 196 +++--
.../calcite/rel/metadata/BuiltInMetadata.java | 17 +-
.../rel/metadata/DefaultRelMetadataProvider.java | 1 +
.../calcite/rel/metadata/RelMdLowerBoundCost.java | 77 ++
.../calcite/rel/metadata/RelMetadataQuery.java | 18 +
.../java/org/apache/calcite/tools/Programs.java | 3 +-
.../org/apache/calcite/util/BuiltInMethod.java | 4 +
.../calcite/plan/volcano/VolcanoPlannerTest.java | 13 +-
.../org/apache/calcite/test/JdbcAdapterTest.java | 3 +
.../org/apache/calcite/test/TopDownOptTest.xml | 92 +--
.../calcite/adapter/kafka/KafkaAdapterTest.java | 2 +
22 files changed, 1577 insertions(+), 873 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java b/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java
index bae22e9..31a4cba 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
/**
* Calling convention that returns results as an
@@ -51,6 +52,10 @@ public enum BindableConvention implements Convention {
return "BINDABLE";
}
+ @Override public RelNode enforce(RelNode input, RelTraitSet required) {
+ return null;
+ }
+
public RelTraitDef getTraitDef() {
return ConventionTraitDef.INSTANCE;
}
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java b/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java
index 7515140..913db6f 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java
@@ -47,7 +47,7 @@ public class RelOptRuleOperand {
public int[] solveOrder;
public int ordinalInParent;
public int ordinalInRule;
- private final RelTrait trait;
+ public final RelTrait trait;
private final Class<? extends RelNode> clazz;
private final ImmutableList<RelOptRuleOperand> children;
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleDriver.java b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleDriver.java
new file mode 100644
index 0000000..3ff8300
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleDriver.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.trace.CalciteTrace;
+
+import org.slf4j.Logger;
+
+/***
+ * <p>The algorithm executes repeatedly in a series of phases. In each phase
+ * the exact rules that may be fired varies. The mapping of phases to rule
+ * sets is maintained in the {@link #ruleQueue}.
+ *
+ * <p>In each phase, the planner then iterates over the rule matches presented
+ * by the rule queue until the rule queue becomes empty.
+ */
+class IterativeRuleDriver implements RuleDriver {
+
+ private static final Logger LOGGER = CalciteTrace.getPlannerTracer();
+
+ private final VolcanoPlanner planner;
+ private final IterativeRuleQueue ruleQueue;
+
+ IterativeRuleDriver(VolcanoPlanner planner) {
+ this.planner = planner;
+ ruleQueue = new IterativeRuleQueue(planner);
+ }
+
+ @Override public IterativeRuleQueue getRuleQueue() {
+ return ruleQueue;
+ }
+
+ @Override public void drive() {
+ PLANNING:
+ for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
+ while (true) {
+ LOGGER.debug("PLANNER = {}; PHASE = {}; COST = {}",
+ this, phase.toString(), planner.root.bestCost);
+
+ VolcanoRuleMatch match = ruleQueue.popMatch(phase);
+ if (match == null) {
+ break;
+ }
+
+ assert match.getRule().matches(match);
+ try {
+ match.onMatch();
+ } catch (VolcanoTimeoutException e) {
+ LOGGER.warn("Volcano planning times out, cancels the subsequent optimization.");
+ planner.canonize();
+ ruleQueue.phaseCompleted(phase);
+ break PLANNING;
+ }
+
+ // The root may have been merged with another
+ // subset. Find the new root subset.
+ planner.canonize();
+ }
+
+ ruleQueue.phaseCompleted(phase);
+ }
+ }
+
+ @Override public void onProduce(RelNode rel, RelSubset subset) {
+ }
+
+ @Override public void onSetMerged(RelSet set) {
+ }
+
+ @Override public void clear() {
+ ruleQueue.clear();
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleQueue.java
similarity index 78%
copy from core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
copy to core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleQueue.java
index 06b9587..531df45 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleQueue.java
@@ -16,10 +16,7 @@
*/
package org.apache.calcite.plan.volcano;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rules.SubstitutionRule;
-import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
import com.google.common.collect.HashMultimap;
@@ -30,8 +27,6 @@ import org.slf4j.Logger;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.ArrayDeque;
-import java.util.Deque;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -43,7 +38,7 @@ import java.util.Set;
* Priority queue of relexps whose rules have not been called, and rule-matches
* which have not yet been acted upon.
*/
-class RuleQueue {
+class IterativeRuleQueue extends RuleQueue {
//~ Static fields/initializers ---------------------------------------------
private static final Logger LOGGER = CalciteTrace.getPlannerTracer();
@@ -63,8 +58,6 @@ class RuleQueue {
final Map<VolcanoPlannerPhase, PhaseMatchList> matchListMap =
new EnumMap<>(VolcanoPlannerPhase.class);
- private final VolcanoPlanner planner;
-
/**
* Maps a {@link VolcanoPlannerPhase} to a set of rule descriptions. Named rules
* may be invoked in their corresponding phase.
@@ -76,8 +69,8 @@ class RuleQueue {
//~ Constructors -----------------------------------------------------------
- RuleQueue(VolcanoPlanner planner) {
- this.planner = planner;
+ IterativeRuleQueue(VolcanoPlanner planner) {
+ super(planner);
phaseRuleMapping = new EnumMap<>(VolcanoPlannerPhase.class);
@@ -106,10 +99,15 @@ class RuleQueue {
/**
* Clear internal data structure for this rule queue.
*/
- public void clear() {
+ @Override public boolean clear() {
+ boolean empty = true;
for (PhaseMatchList matchList : matchListMap.values()) {
+ if (!matchList.queue.isEmpty() || !matchList.preQueue.isEmpty()) {
+ empty = false;
+ }
matchList.clear();
}
+ return !empty;
}
/**
@@ -125,7 +123,7 @@ class RuleQueue {
* existing {@link PhaseMatchList per-phase rule-match lists} which allow
* the rule referenced by the match.
*/
- void addMatch(VolcanoRuleMatch match) {
+ public void addMatch(VolcanoRuleMatch match) {
final String matchName = match.toString();
for (PhaseMatchList matchList : matchListMap.values()) {
Set<String> phaseRuleSet = phaseRuleMapping.get(matchList.phase);
@@ -163,7 +161,7 @@ class RuleQueue {
* previously marked as completed via
* {@link #phaseCompleted(VolcanoPlannerPhase)}.
*/
- VolcanoRuleMatch popMatch(VolcanoPlannerPhase phase) {
+ public VolcanoRuleMatch popMatch(VolcanoPlannerPhase phase) {
dumpPlannerState();
PhaseMatchList phaseMatchList = matchListMap.get(phase);
@@ -233,66 +231,6 @@ class RuleQueue {
}
}
- /** Returns whether to skip a match. This happens if any of the
- * {@link RelNode}s have importance zero. */
- private boolean skipMatch(VolcanoRuleMatch match) {
- for (RelNode rel : match.rels) {
- if (planner.prunedNodes.contains(rel)) {
- return true;
- }
- }
-
- // If the same subset appears more than once along any path from root
- // operand to a leaf operand, we have matched a cycle. A relational
- // expression that consumes its own output can never be implemented, and
- // furthermore, if we fire rules on it we may generate lots of garbage.
- // For example, if
- // Project(A, X = X + 0)
- // is in the same subset as A, then we would generate
- // Project(A, X = X + 0 + 0)
- // Project(A, X = X + 0 + 0 + 0)
- // also in the same subset. They are valid but useless.
- final Deque<RelSubset> subsets = new ArrayDeque<>();
- try {
- checkDuplicateSubsets(subsets, match.rule.getOperand(), match.rels);
- } catch (Util.FoundOne e) {
- return true;
- }
- return false;
- }
-
- /** Recursively checks whether there are any duplicate subsets along any path
- * from root of the operand tree to one of the leaves.
- *
- * <p>It is OK for a match to have duplicate subsets if they are not on the
- * same path. For example,
- *
- * <blockquote><pre>
- * Join
- * / \
- * X X
- * </pre></blockquote>
- *
- * <p>is a valid match.
- *
- * @throws org.apache.calcite.util.Util.FoundOne on match
- */
- private void checkDuplicateSubsets(Deque<RelSubset> subsets,
- RelOptRuleOperand operand, RelNode[] rels) {
- final RelSubset subset = planner.getSubset(rels[operand.ordinalInRule]);
- if (subsets.contains(subset)) {
- throw Util.FoundOne.NULL;
- }
- if (!operand.getChildOperands().isEmpty()) {
- subsets.push(subset);
- for (RelOptRuleOperand childOperand : operand.getChildOperands()) {
- checkDuplicateSubsets(subsets, childOperand, rels);
- }
- final RelSubset x = subsets.pop();
- assert x == subset;
- }
- }
-
//~ Inner Classes ----------------------------------------------------------
/**
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java b/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java
deleted file mode 100644
index c72e2b8..0000000
--- a/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.plan.volcano;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.DeriveMode;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.util.trace.CalciteTrace;
-
-import org.apiguardian.api.API;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * <code>OptimizeTask</code> represents the optimization task
- * of VolcanoPlanner.
- *
- * <p>How does it work?</p>
- *
- * <p>Let S# denote the seed physical operator in a RelSet after
- * logical and physical rules transformation, P# denote the
- * physical operator generated by passing down parent trait
- * requirements, D# denote the physical operator generated by
- * deriving from child delivered traitSets.</p>
- *
- * The initial rel list state in a RelSet is as follows:
- * <pre>
- * cursor
- * |
- * V
- * S1, S2
- * </pre>
- *
- * When we create a task for RelSubset1, the task will immediately
- * pass the subset's traitSet to seed operators, S1 and S2,
- * now we have:
- * <pre>
- * cursor
- * |
- * V
- * S1, S2, P1, P2
- * </pre>
- *
- * The subset task will create a optimization task for the relnode
- * pointed by cursor, and move cursor to next available physical
- * operator S2. In the task for S1, it will continue optimize its
- * child nodes, which are RelSubsets. After child inputs optimization
- * is finished, S1 will derive new relnodes from delivered subsets
- * in input RelSet. Once task for S1 is completed, we have:
- * <pre>
- * cursor
- * |
- * V
- * S1, S2, P1, P2, D1
- * </pre>
- *
- * The subset task continues scheduling task for S2, P1... until
- * there is no more relnode created for the RelSet, then we have:
- * <pre>
- * cursor
- * |
- * V
- * S1, S2, P1, P2, D1, D2, D3, null
- * </pre>
- *
- * When a task for another RelSubset2 is created, the task will try
- * to pass down the subset's traitSet to seed operator S1 and S2,
- * now the RelSet looks like:
- * <pre>
- * cursor
- * |
- * V
- * S1, S2, P1, P2, D1, D2, D3, P3, P4
- * </pre>
- *
- * The process continues till there is no more subsets or relnodes
- * created for the RelSet.
- */
-@API(since = "1.23", status = API.Status.INTERNAL)
-abstract class OptimizeTask {
-
- static final Logger LOGGER = CalciteTrace.getPlannerTaskTracer();
-
- static OptimizeTask create(RelNode node) {
- if (node instanceof RelSubset) {
- return new RelSubsetOptTask((RelSubset) node);
- }
- return new RelNodeOptTask(node);
- }
-
- final VolcanoPlanner planner;
- final int id;
-
- OptimizeTask(RelNode node) {
- planner = (VolcanoPlanner) node.getCluster().getPlanner();
- id = planner.nextTaskId++;
- LOGGER.debug("Scheduled task(id={}) for {}", id, node);
- }
-
- abstract boolean hasSubTask();
-
- abstract OptimizeTask nextSubTask();
-
- abstract void execute();
-
- /**
- * Task State
- */
- public enum State {
- SCHEDULED,
- EXECUTING,
- COMPLETED
- }
-
- /**
- * Task for optimizing RelNode.
- *
- * <p>Optimize input RelSubsets and derive new RelNodes
- * from child traitSets.</p>
- */
- static class RelNodeOptTask extends OptimizeTask {
- final RelNode node;
- int nextId = 0; // next child index
-
- RelNodeOptTask(RelNode node) {
- super(node);
- this.node = node;
- }
-
- @Override boolean hasSubTask() {
- int size = node.getInputs().size();
- while (nextId < size) {
- RelSubset subset = (RelSubset) node.getInput(nextId);
- if (subset.taskState == null) {
- // not yet scheduled
- return true;
- } else {
- // maybe a cycle if it is not completed
- nextId++;
- }
- }
-
- return false;
- }
-
- @Override OptimizeTask nextSubTask() {
- RelNode child = node.getInput(nextId++);
- return new RelSubsetOptTask((RelSubset) child);
- }
-
- @Override void execute() {
- if (!(node instanceof PhysicalNode)
- || ((PhysicalNode) node).getDeriveMode() == DeriveMode.PROHIBITED
- || !planner.isSeedNode(node)) {
- LOGGER.debug("Completed task(id={}) for {}", id, node);
- return;
- }
-
- PhysicalNode rel = (PhysicalNode) node;
- DeriveMode mode = rel.getDeriveMode();
- int arity = node.getInputs().size();
- // for OMAKASE
- List<List<RelTraitSet>> inputTraits = new ArrayList<>(arity);
-
- for (int i = 0; i < arity; i++) {
- int childId = i;
- if (mode == DeriveMode.RIGHT_FIRST) {
- childId = arity - i - 1;
- }
-
- RelSubset input = (RelSubset) node.getInput(childId);
- List<RelTraitSet> traits = new ArrayList<>();
- inputTraits.add(traits);
-
- final int numSubset = input.set.subsets.size();
- for (int j = 0; j < numSubset; j++) {
- RelSubset subset = input.set.subsets.get(j);
- if (!subset.isDelivered() || equalsSansConvention(
- subset.getTraitSet(), rel.getCluster().traitSet())) {
- // Ideally we should stop deriving new relnodes when the
- // subset's traitSet equals with input traitSet, but
- // in case someone manually builds a physical relnode
- // tree, which is highly discouraged, without specifying
- // correct traitSet, e.g.
- // EnumerableFilter [].ANY
- // -> EnumerableMergeJoin [a].Hash[a]
- // We should still be able to derive the correct traitSet
- // for the dumb filter, even though the filter's traitSet
- // should be derived from the MergeJoin when it is created.
- // But if the subset's traitSet equals with the default
- // empty traitSet sans convention (the default traitSet
- // from cluster may have logical convention, NONE, which
- // is not interesting), we are safe to ignore it, because
- // a physical filter with non default traitSet, but has a
- // input with default empty traitSet, e.g.
- // EnumerableFilter [a].Hash[a]
- // -> EnumerableProject [].ANY
- // is definitely wrong, we should fail fast.
- continue;
- }
-
- if (mode == DeriveMode.OMAKASE) {
- traits.add(subset.getTraitSet());
- } else {
- RelNode newRel = rel.derive(subset.getTraitSet(), childId);
- if (newRel != null && !planner.isRegistered(newRel)) {
- RelNode newInput = newRel.getInput(childId);
- assert newInput instanceof RelSubset;
- if (newInput == subset) {
- // If the child subset is used to derive new traits for
- // current relnode, the subset will be marked REQUIRED
- // when registering the new derived relnode and later
- // will add enforcers between other delivered subsets.
- // e.g. a MergeJoin request both inputs hash distributed
- // by [a,b] sorted by [a,b]. If the left input R1 happens to
- // be distributed by [a], the MergeJoin can derive new
- // traits from this input and request both input to be
- // distributed by [a] sorted by [a,b]. In case there is a
- // alternative R2 with ANY distribution in the left input's
- // RelSet, we may end up with requesting hash distribution
- // [a] on alternative R2, which is unnecessary and waste,
- // because we request distribution by [a] because of R1 can
- // deliver the exact same distribution and we don't need to
- // enforce properties on other subsets that can't satisfy
- // the specific trait requirement.
- // Here we add a constraint that {@code newInput == subset},
- // because if the delivered child subset is HASH[a], but
- // we require HASH[a].SORT[a,b], we still need to enable
- // property enforcement on the required subset. Otherwise,
- // we need to restrict enforcement between HASH[a].SORT[a,b]
- // and HASH[a] only, which will make things a little bit
- // complicated. We might optimize it in the future.
- subset.disableEnforcing();
- }
- RelSubset relSubset = planner.register(newRel, node);
- assert relSubset.set == planner.getSubset(node).set;
- }
- }
- }
-
- if (mode == DeriveMode.LEFT_FIRST
- || mode == DeriveMode.RIGHT_FIRST) {
- break;
- }
- }
-
- if (mode == DeriveMode.OMAKASE) {
- List<RelNode> relList = rel.derive(inputTraits);
- for (RelNode relNode : relList) {
- if (!planner.isRegistered(relNode)) {
- planner.register(relNode, node);
- }
- }
- }
-
- LOGGER.debug("Completed task(id={}) for {}", id, node);
- }
-
- /**
- * Returns whether the 2 traitSets are equal without Convention.
- * It assumes they have the same traitDefs order.
- */
- private boolean equalsSansConvention(RelTraitSet ts1, RelTraitSet ts2) {
- assert ts1.size() == ts2.size();
- for (int i = 0; i < ts1.size(); i++) {
- RelTrait trait = ts1.getTrait(i);
- if (trait.getTraitDef() == ConventionTraitDef.INSTANCE) {
- continue;
- }
- if (!trait.equals(ts2.getTrait(i))) {
- return false;
- }
- }
- return true;
- }
-
- @Override public String toString() {
- return "Task#" + id + ":{ " + node + " }";
- }
- }
-
- /**
- * Task for optimizing RelSubset.
- *
- * <p>Pass down the trait requirements of current RelSubset
- * and add enforcers to the new delivered subsets.</p>
- */
- static class RelSubsetOptTask extends OptimizeTask {
- final RelSubset subset;
- final Set<RelSubset> deliveredSubsets = new HashSet<>();
-
- RelSubsetOptTask(RelSubset subset) {
- super(subset);
- this.subset = subset;
- subset.taskState = State.SCHEDULED;
- propagateTraits();
- }
-
- private void propagateTraits() {
- int size = Math.min(subset.set.getSeedSize(),
- subset.set.rels.size());
-
- for (int i = 0; i < size; i++) {
- RelNode rel = subset.set.rels.get(i);
- if (!(rel instanceof PhysicalNode)
- || rel.getConvention() == Convention.NONE
- || rel.getTraitSet().satisfies(subset.getTraitSet())) {
- continue;
- }
-
- RelNode node = ((PhysicalNode) rel).passThrough(
- subset.getTraitSet());
- if (node != null && !planner.isRegistered(node)) {
- RelSubset newSubset = planner.register(node, subset);
- deliveredSubsets.add(newSubset);
- } else {
- // TODO: should we consider stop trying propagation on node
- // with the same traitset as phyNode?
- assert true;
- }
- }
- }
-
- @Override boolean hasSubTask() {
- return subset.set.hasNextPhysicalNode();
- }
-
- @Override OptimizeTask nextSubTask() {
- RelNode rel = subset.set.nextPhysicalNode();
- return new RelNodeOptTask(rel);
- }
-
- @Override void execute() {
- subset.taskState = State.EXECUTING;
-
- subset.set.addConverters(subset, true, false);
-
- for (RelSubset delivered : deliveredSubsets) {
- subset.set.addConverters(delivered, false, false);
- }
-
- subset.taskState = State.COMPLETED;
- LOGGER.debug("Completed task(id={}) for {}", id, subset);
- }
-
- @Override public String toString() {
- return "Task#" + id + ":{ " + subset + " }";
- }
- }
-}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
index 5d20104..bf4fe5d 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
@@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.Converter;
import org.apache.calcite.rel.core.CorrelationId;
@@ -76,15 +75,9 @@ class RelSet {
RelNode rel;
/**
- * The position indicator of rel node that is to be processed.
+ * Exploring state of current RelSet.
*/
- private int relCursor = 0;
-
- /**
- * The relnodes after applying logical rules and physical rules,
- * before trait propagation and enforcement.
- */
- final Set<RelNode> seeds = new HashSet<>();
+ ExploringState exploringState;
/**
* Records conversions / enforcements that have happened on the
@@ -166,32 +159,6 @@ class RelSet {
return null;
}
- public int getSeedSize() {
- if (seeds.isEmpty()) {
- seeds.addAll(rels);
- }
- return seeds.size();
- }
-
- public boolean hasNextPhysicalNode() {
- while (relCursor < rels.size()) {
- RelNode node = rels.get(relCursor);
- if (node instanceof PhysicalNode
- && node.getConvention() != Convention.NONE) {
- // enforcer may be manually created for some reason
- if (relCursor < getSeedSize() || !node.isEnforcer()) {
- return true;
- }
- }
- relCursor++;
- }
- return false;
- }
-
- public RelNode nextPhysicalNode() {
- return rels.get(relCursor++);
- }
-
/**
* Removes all references to a specific {@link RelNode} in both the subsets
* and their parent relationships.
@@ -315,8 +282,8 @@ class RelSet {
subset.setDelivered();
}
- if (needsConverter && !planner.topDownOpt) {
- addConverters(subset, required, true);
+ if (needsConverter) {
+ addConverters(subset, required, !planner.topDownOpt);
}
return subset;
@@ -410,6 +377,13 @@ class RelSet {
subset = getOrCreateSubset(cluster, otherTraits, true);
}
+ assert subset != null;
+ if (subset.passThroughCache == null) {
+ subset.passThroughCache = otherSubset.passThroughCache;
+ } else if (otherSubset.passThroughCache != null) {
+ subset.passThroughCache.addAll(otherSubset.passThroughCache);
+ }
+
// collect RelSubset instances, whose best should be changed
if (otherSubset.bestCost.isLt(subset.bestCost)) {
changedSubsets.put(subset, otherSubset.best);
@@ -488,4 +462,23 @@ class RelSet {
planner.fireRules(subset);
}
}
+
+ //~ Inner Classes ----------------------------------------------------------
+
+ /**
+ * An enum representing exploring state of current RelSet.
+ */
+ enum ExploringState {
+ /**
+ * The RelSet is exploring.
+ * It means all possible rule matches are scheduled, but not fully applied.
+ * This RelSet will refuse to explore again, but cannot provide a valid LB.
+ */
+ EXPLORING,
+
+ /**
+ * The RelSet is fully explored and is able to provide a valid LB.
+ */
+ EXPLORED
+ }
}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java b/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
index afecdce..ad5bda8 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.CorrelationId;
@@ -37,6 +38,8 @@ import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
+import com.google.common.collect.Sets;
+
import org.apiguardian.api.API;
import org.slf4j.Logger;
@@ -84,7 +87,7 @@ public class RelSubset extends AbstractRelNode {
/**
* Optimization task state
*/
- OptimizeTask.State taskState;
+ OptimizeState taskState;
/**
* cost of best known plan (it may have improved since)
@@ -128,6 +131,17 @@ public class RelSubset extends AbstractRelNode {
*/
private boolean enforceDisabled = false;
+ /**
+ * The upper bound of the last OptimizeGroup call.
+ */
+ RelOptCost upperBound;
+
+ /**
+ * A cache that recognize which RelNode has invoked the passThrough method
+ * so as to avoid duplicate invocation.
+ */
+ Set<RelNode> passThroughCache;
+
//~ Constructors -----------------------------------------------------------
RelSubset(
@@ -138,6 +152,7 @@ public class RelSubset extends AbstractRelNode {
this.set = set;
assert traits.allSimple();
computeBestCost(cluster.getPlanner());
+ upperBound = bestCost;
}
//~ Methods ----------------------------------------------------------------
@@ -414,6 +429,7 @@ public class RelSubset extends AbstractRelNode {
bestCost = cost;
best = rel;
+ upperBound = bestCost;
// since best was changed, cached metadata for this subset should be removed
mq.clearCache(this);
@@ -489,6 +505,70 @@ public class RelSubset extends AbstractRelNode {
.filter(s -> traitSet.satisfies(s.getTraitSet()));
}
+ /**
+ * Returns the best cost if this subset is fully optimized
+ * or null if the subset is not fully optimized.
+ */
+ @API(since = "1.24", status = API.Status.INTERNAL)
+ public RelOptCost getWinnerCost() {
+ if (taskState == OptimizeState.COMPLETED && bestCost.isLe(upperBound)) {
+ return bestCost;
+ }
+ // if bestCost != upperBound, it means optimize failed
+ return null;
+ }
+
+ void startOptimize(RelOptCost ub) {
+ assert getWinnerCost() == null : this + " is already optimized";
+ if (upperBound.isLt(ub)) {
+ upperBound = ub;
+ if (bestCost.isLt(upperBound)) {
+ upperBound = bestCost;
+ }
+ }
+ taskState = OptimizeState.OPTIMIZING;
+ }
+
+ void setOptimized() {
+ taskState = OptimizeState.COMPLETED;
+ }
+
+ boolean resetTaskState() {
+ boolean optimized = taskState != null;
+ taskState = null;
+ upperBound = bestCost;
+ return optimized;
+ }
+
+ RelNode passThrough(RelNode rel) {
+ if (!(rel instanceof PhysicalNode)) {
+ return null;
+ }
+ if (passThroughCache == null) {
+ passThroughCache = Sets.newIdentityHashSet();
+ passThroughCache.add(rel);
+ } else if (!passThroughCache.add(rel)) {
+ return null;
+ }
+ return ((PhysicalNode) rel).passThrough(this.getTraitSet());
+ }
+
+ boolean isExplored() {
+ return set.exploringState == RelSet.ExploringState.EXPLORED;
+ }
+
+ boolean explore() {
+ if (set.exploringState != null) {
+ return false;
+ }
+ set.exploringState = RelSet.ExploringState.EXPLORING;
+ return true;
+ }
+
+ void setExplored() {
+ set.exploringState = RelSet.ExploringState.EXPLORED;
+ }
+
//~ Inner Classes ----------------------------------------------------------
/**
@@ -700,4 +780,9 @@ public class RelSubset extends AbstractRelNode {
return p;
}
}
+
+ enum OptimizeState {
+ OPTIMIZING,
+ COMPLETED
+ }
}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RuleDriver.java b/core/src/main/java/org/apache/calcite/plan/volcano/RuleDriver.java
new file mode 100644
index 0000000..f32279a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RuleDriver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * A rule driver applies rules with designed algorithms.
+ */
+interface RuleDriver {
+
+ /**
+ * Gets the rule queue.
+ */
+ RuleQueue getRuleQueue();
+
+ /**
+ * Apply rules.
+ */
+ void drive();
+
+ /**
+ * Callback when new RelNodes are added into RelSet.
+ * @param rel the new RelNode
+ * @param subset subset to add
+ */
+ void onProduce(RelNode rel, RelSubset subset);
+
+ /**
+ * Callback when RelSets are merged.
+ * @param set the merged result set
+ */
+ void onSetMerged(RelSet set);
+
+ /**
+ * Clear this RuleDriver.
+ */
+ void clear();
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java b/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
index 06b9587..ac683cb 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
@@ -18,224 +18,41 @@ package org.apache.calcite.plan.volcano;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.rules.SubstitutionRule;
import org.apache.calcite.util.Util;
-import org.apache.calcite.util.trace.CalciteTrace;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-
-import org.slf4j.Logger;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.ArrayDeque;
import java.util.Deque;
-import java.util.EnumMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
/**
- * Priority queue of relexps whose rules have not been called, and rule-matches
- * which have not yet been acted upon.
+ * A data structure that manages rule matches for RuleDriver.
+ * Different RuleDriver requires different ways to pop matches,
+ * thus different ways to store rule matches that are not called.
*/
-class RuleQueue {
- //~ Static fields/initializers ---------------------------------------------
-
- private static final Logger LOGGER = CalciteTrace.getPlannerTracer();
-
- private static final Set<String> ALL_RULES = ImmutableSet.of("<ALL RULES>");
-
- //~ Instance fields --------------------------------------------------------
-
- /**
- * Map of {@link VolcanoPlannerPhase} to a list of rule-matches. Initially,
- * there is an empty {@link PhaseMatchList} for each planner phase. As the
- * planner invokes {@link #addMatch(VolcanoRuleMatch)} the rule-match is
- * added to the appropriate PhaseMatchList(s). As the planner completes
- * phases, the matching entry is removed from this list to avoid unused
- * work.
- */
- final Map<VolcanoPlannerPhase, PhaseMatchList> matchListMap =
- new EnumMap<>(VolcanoPlannerPhase.class);
-
- private final VolcanoPlanner planner;
-
- /**
- * Maps a {@link VolcanoPlannerPhase} to a set of rule descriptions. Named rules
- * may be invoked in their corresponding phase.
- *
- * <p>See {@link VolcanoPlannerPhaseRuleMappingInitializer} for more
- * information regarding the contents of this Map and how it is initialized.
- */
- private final Map<VolcanoPlannerPhase, Set<String>> phaseRuleMapping;
+public abstract class RuleQueue {
- //~ Constructors -----------------------------------------------------------
+ protected final VolcanoPlanner planner;
- RuleQueue(VolcanoPlanner planner) {
+ protected RuleQueue(VolcanoPlanner planner) {
this.planner = planner;
-
- phaseRuleMapping = new EnumMap<>(VolcanoPlannerPhase.class);
-
- // init empty sets for all phases
- for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
- phaseRuleMapping.put(phase, new HashSet<>());
- }
-
- // configure phases
- planner.getPhaseRuleMappingInitializer().initialize(phaseRuleMapping);
-
- for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
- // empty phases get converted to "all rules"
- if (phaseRuleMapping.get(phase).isEmpty()) {
- phaseRuleMapping.put(phase, ALL_RULES);
- }
-
- // create a match list data structure for each phase
- PhaseMatchList matchList = new PhaseMatchList(phase);
-
- matchListMap.put(phase, matchList);
- }
}
- //~ Methods ----------------------------------------------------------------
/**
- * Clear internal data structure for this rule queue.
- */
- public void clear() {
- for (PhaseMatchList matchList : matchListMap.values()) {
- matchList.clear();
- }
- }
-
- /**
- * Removes the {@link PhaseMatchList rule-match list} for the given planner
- * phase.
- */
- public void phaseCompleted(VolcanoPlannerPhase phase) {
- matchListMap.get(phase).clear();
- }
-
- /**
- * Adds a rule match. The rule-matches are automatically added to all
- * existing {@link PhaseMatchList per-phase rule-match lists} which allow
- * the rule referenced by the match.
- */
- void addMatch(VolcanoRuleMatch match) {
- final String matchName = match.toString();
- for (PhaseMatchList matchList : matchListMap.values()) {
- Set<String> phaseRuleSet = phaseRuleMapping.get(matchList.phase);
- if (phaseRuleSet != ALL_RULES) {
- String ruleDescription = match.getRule().toString();
- if (!phaseRuleSet.contains(ruleDescription)) {
- continue;
- }
- }
-
- if (!matchList.names.add(matchName)) {
- // Identical match has already been added.
- continue;
- }
-
- LOGGER.trace("{} Rule-match queued: {}", matchList.phase.toString(), matchName);
-
- matchList.offer(match);
-
- matchList.matchMap.put(
- planner.getSubset(match.rels[0]), match);
- }
- }
-
- /**
- * Removes the rule match from the head of match list, and returns it.
- *
- * <p>Returns {@code null} if there are no more matches.</p>
- *
- * <p>Note that the VolcanoPlanner may still decide to reject rule matches
- * which have become invalid, say if one of their operands belongs to an
- * obsolete set or has importance=0.
- *
- * @throws java.lang.AssertionError if this method is called with a phase
- * previously marked as completed via
- * {@link #phaseCompleted(VolcanoPlannerPhase)}.
+ * Add a RuleMatch into the queue.
+ * @param match rule match to add
*/
- VolcanoRuleMatch popMatch(VolcanoPlannerPhase phase) {
- dumpPlannerState();
-
- PhaseMatchList phaseMatchList = matchListMap.get(phase);
- if (phaseMatchList == null) {
- throw new AssertionError("Used match list for phase " + phase
- + " after phase complete");
- }
-
- VolcanoRuleMatch match;
- for (;;) {
- if (phaseMatchList.size() == 0) {
- return null;
- }
-
- dumpRuleQueue(phaseMatchList);
-
- match = phaseMatchList.poll();
-
- if (skipMatch(match)) {
- LOGGER.debug("Skip match: {}", match);
- } else {
- break;
- }
- }
-
- // If sets have merged since the rule match was enqueued, the match
- // may not be removed from the matchMap because the subset may have
- // changed, it is OK to leave it since the matchMap will be cleared
- // at the end.
- phaseMatchList.matchMap.remove(
- planner.getSubset(match.rels[0]), match);
-
- LOGGER.debug("Pop match: {}", match);
- return match;
- }
+ public abstract void addMatch(VolcanoRuleMatch match);
/**
- * Dumps rules queue to the logger when debug level is set to {@code TRACE}.
+ * clear this rule queue.
+ * The return value indicates whether the rule queue was empty before clear.
+ * @return true if the rule queue was not empty
*/
- private void dumpRuleQueue(PhaseMatchList phaseMatchList) {
- if (LOGGER.isTraceEnabled()) {
- StringBuilder b = new StringBuilder();
- b.append("Rule queue:");
- for (VolcanoRuleMatch rule : phaseMatchList.preQueue) {
- b.append("\n");
- b.append(rule);
- }
- for (VolcanoRuleMatch rule : phaseMatchList.queue) {
- b.append("\n");
- b.append(rule);
- }
- LOGGER.trace(b.toString());
- }
- }
+ public abstract boolean clear();
- /**
- * Dumps planner's state to the logger when debug level is set to {@code TRACE}.
- */
- private void dumpPlannerState() {
- if (LOGGER.isTraceEnabled()) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- planner.dump(pw);
- pw.flush();
- LOGGER.trace(sw.toString());
- planner.getRoot().getCluster().invalidateMetadataQuery();
- }
- }
/** Returns whether to skip a match. This happens if any of the
* {@link RelNode}s have importance zero. */
- private boolean skipMatch(VolcanoRuleMatch match) {
+ protected boolean skipMatch(VolcanoRuleMatch match) {
for (RelNode rel : match.rels) {
if (planner.prunedNodes.contains(rel)) {
return true;
@@ -292,73 +109,4 @@ class RuleQueue {
assert x == subset;
}
}
-
- //~ Inner Classes ----------------------------------------------------------
-
- /**
- * PhaseMatchList represents a set of {@link VolcanoRuleMatch rule-matches}
- * for a particular
- * {@link VolcanoPlannerPhase phase of the planner's execution}.
- */
- private static class PhaseMatchList {
- /**
- * The VolcanoPlannerPhase that this PhaseMatchList is used in.
- */
- final VolcanoPlannerPhase phase;
-
- /**
- * Rule match queue for SubstitutionRule
- */
- private final Queue<VolcanoRuleMatch> preQueue = new LinkedList<>();
-
- /**
- * Current list of VolcanoRuleMatches for this phase. New rule-matches
- * are appended to the end of this queue.
- * The rules are not sorted in any way.
- */
- private final Queue<VolcanoRuleMatch> queue = new LinkedList<>();
-
- /**
- * A set of rule-match names contained in {@link #queue}. Allows fast
- * detection of duplicate rule-matches.
- */
- final Set<String> names = new HashSet<>();
-
- /**
- * Multi-map of RelSubset to VolcanoRuleMatches.
- */
- final Multimap<RelSubset, VolcanoRuleMatch> matchMap =
- HashMultimap.create();
-
- PhaseMatchList(VolcanoPlannerPhase phase) {
- this.phase = phase;
- }
-
- int size() {
- return preQueue.size() + queue.size();
- }
-
- VolcanoRuleMatch poll() {
- VolcanoRuleMatch match = preQueue.poll();
- if (match == null) {
- match = queue.poll();
- }
- return match;
- }
-
- void offer(VolcanoRuleMatch match) {
- if (match.getRule() instanceof SubstitutionRule) {
- preQueue.offer(match);
- } else {
- queue.offer(match);
- }
- }
-
- void clear() {
- preQueue.clear();
- queue.clear();
- names.clear();
- matchMap.clear();
- }
- }
}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java
new file mode 100644
index 0000000..0443159
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java
@@ -0,0 +1,898 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.DeriveMode;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.trace.CalciteTrace;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+import java.util.function.Predicate;
+
+class TopDownRuleDriver implements RuleDriver {
+
+ private static final Logger LOGGER = CalciteTrace.getPlannerTaskTracer();
+
+ private final VolcanoPlanner planner;
+
+ /**
+ * The rule queue designed for top-down rule applying.
+ */
+ private final TopDownRuleQueue ruleQueue;
+
+ /**
+ * All tasks waiting for execution.
+ */
+ private Stack<Task> tasks = new Stack<>();
+
+ /**
+ * A task that is currently applying and may generate new RelNode.
+ * It provides a callback to schedule tasks for new RelNodes that
+ * are registered during task performing.
+ */
+ private GeneratorTask applying = null;
+
+ /**
+ * RelNodes that are generated by passThrough or derive
+ * these nodes will not takes part in another passThrough or derive.
+ */
+ private Set<RelNode> passThroughCache = new HashSet<>();
+
+ //~ Constructors -----------------------------------------------------------
+
+ TopDownRuleDriver(VolcanoPlanner planner) {
+ this.planner = planner;
+ ruleQueue = new TopDownRuleQueue(planner);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ @Override public void drive() {
+ TaskDescriptor description = new TaskDescriptor();
+ tasks.push(new OptimizeGroup(planner.root, planner.infCost));
+ exploreMaterializationRoots();
+ while (!tasks.isEmpty()) {
+ Task task = tasks.pop();
+ description.log(task);
+ task.perform();
+ }
+ }
+
+ private void exploreMaterializationRoots() {
+ for (RelSubset extraRoot : planner.explorationRoots) {
+ RelSet rootSet = VolcanoPlanner.equivRoot(extraRoot.set);
+ if (rootSet == planner.root.set) {
+ continue;
+ }
+ for (RelNode rel : extraRoot.set.rels) {
+ if (planner.isLogical(rel)) {
+ tasks.push(new OptimizeMExpr(rel, extraRoot, true));
+ }
+ }
+ }
+ }
+
+ @Override public TopDownRuleQueue getRuleQueue() {
+ return ruleQueue;
+ }
+
+ @Override public void clear() {
+ ruleQueue.clear();
+ tasks.clear();
+ passThroughCache.clear();
+ applying = null;
+ }
+
+ private interface Procedure {
+ void exec();
+ }
+
+ private void applyGenerator(GeneratorTask task, Procedure proc) {
+ GeneratorTask applying = this.applying;
+ this.applying = task;
+ try {
+ proc.exec();
+ } finally {
+ this.applying = applying;
+ }
+ }
+
+ public void onSetMerged(RelSet set) {
+ applyGenerator(null, () -> clearProcessed(set));
+ }
+
+ private void clearProcessed(RelSet set) {
+ boolean explored = set.exploringState != null;
+ set.exploringState = null;
+
+ for (RelSubset subset : set.subsets) {
+ if (subset.resetTaskState() || explored) {
+ Collection<RelNode> parentRels = subset.getParentRels();
+ for (RelNode parentRel : parentRels) {
+ clearProcessed(planner.getSet(parentRel));
+ }
+ if (subset == planner.root) {
+ tasks.push(new OptimizeGroup(subset, planner.infCost));
+ }
+ }
+ }
+ }
+
+ public void onProduce(RelNode node, RelSubset subset) {
+ if (applying == null || subset.set
+ != VolcanoPlanner.equivRoot(applying.group().set)) {
+ return;
+ }
+
+ if (!applying.onProduce(node)) {
+ return;
+ }
+
+ if (!planner.isLogical(node)) {
+ RelSubset optimizingGroup = null;
+ boolean canPassThrough = node instanceof PhysicalNode
+ && !passThroughCache.contains(node);
+ if (!canPassThrough && subset.taskState != null) {
+ optimizingGroup = subset;
+ } else {
+ RelOptCost upperBound = planner.zeroCost;
+ RelSet set = subset.getSet();
+ List<RelSubset> subsetsToPassThrough = new ArrayList<>();
+ for (RelSubset otherSubset : set.subsets) {
+ if (!otherSubset.isRequired() || otherSubset != planner.root
+ && otherSubset.taskState != RelSubset.OptimizeState.OPTIMIZING) {
+ continue;
+ }
+ if (node.getTraitSet().satisfies(otherSubset.getTraitSet())) {
+ if (upperBound.isLt(otherSubset.upperBound)) {
+ upperBound = otherSubset.upperBound;
+ optimizingGroup = otherSubset;
+ }
+ } else if (canPassThrough) {
+ subsetsToPassThrough.add(otherSubset);
+ }
+ }
+ for (RelSubset otherSubset : subsetsToPassThrough) {
+ Task task = getOptimizeInputTask(node, otherSubset);
+ if (task != null) {
+ tasks.push(task);
+ }
+ }
+ }
+ if (optimizingGroup == null) {
+ return;
+ }
+ Task task = getOptimizeInputTask(node, optimizingGroup);
+ if (task != null) {
+ tasks.push(task);
+ }
+ } else {
+ boolean optimizing = subset.set.subsets.stream()
+ .anyMatch(s -> s.taskState == RelSubset.OptimizeState.OPTIMIZING);
+ tasks.push(
+ new OptimizeMExpr(node, applying.group(),
+ applying.exploring() && !optimizing));
+ }
+ }
+
+ //~ Inner Classes ----------------------------------------------------------
+
+ /**
+ * Base class for planner task.
+ */
+ private interface Task {
+ void perform();
+ void describe(TaskDescriptor desc);
+ }
+
+ /**
+ * A class for task logging.
+ */
+ private static class TaskDescriptor {
+ private boolean first = true;
+ private StringBuilder builder = new StringBuilder();
+
+ void log(Task task) {
+ if (!LOGGER.isDebugEnabled()) {
+ return;
+ }
+ first = true;
+ builder.setLength(0);
+ builder.append("Execute task: ").append(task.getClass().getSimpleName());
+ task.describe(this);
+ if (!first) {
+ builder.append(")");
+ }
+
+ LOGGER.info(builder.toString());
+ }
+
+ TaskDescriptor item(String name, Object value) {
+ if (first) {
+ first = false;
+ builder.append("(");
+ } else {
+ builder.append(", ");
+ }
+ builder.append(name).append("=").append(value);
+ return this;
+ }
+ }
+
+ private interface GeneratorTask extends Task {
+ RelSubset group();
+ boolean exploring();
+ default boolean onProduce(RelNode node) {
+ return true;
+ }
+ }
+
+ /**
+ * O_GROUP.
+ */
+ private class OptimizeGroup implements Task {
+ private final RelSubset group;
+ private RelOptCost upperBound;
+
+ OptimizeGroup(RelSubset group, RelOptCost upperBound) {
+ this.group = group;
+ this.upperBound = upperBound;
+ }
+
+ @Override public void perform() {
+ RelOptCost winner = group.getWinnerCost();
+ if (winner != null) {
+ return;
+ }
+
+ if (group.taskState != null && upperBound.isLe(group.upperBound)) {
+ // this group failed to optimize before or it is a ring
+ return;
+ }
+
+ group.startOptimize(upperBound);
+
+ // cannot decide an actual lower bound before MExpr are fully explored
+ // so delay the lower bound checking
+
+ // a gate keeper to update context
+ tasks.push(new GroupOptimized(group));
+
+ // optimize mExprs in group
+ List<RelNode> physicals = new ArrayList<>();
+ for (RelNode rel : group.set.rels) {
+ if (planner.isLogical(rel)) {
+ tasks.push(new OptimizeMExpr(rel, group, false));
+ } else if (rel.isEnforcer()) {
+ // Enforcers have lower priority than other physical nodes
+ physicals.add(0, rel);
+ } else {
+ physicals.add(rel);
+ }
+ }
+ // always apply O_INPUTS first so as to get an valid upper bound
+ for (RelNode rel : physicals) {
+ Task task = getOptimizeInputTask(rel, group);
+ if (task != null) {
+ tasks.add(task);
+ }
+ }
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("group", group).item("upperBound", upperBound);
+ }
+ }
+
+ /**
+ * Mark the group optimized.
+ */
+ private static class GroupOptimized implements Task {
+ private final RelSubset group;
+
+ GroupOptimized(RelSubset group) {
+ this.group = group;
+ }
+
+ @Override public void perform() {
+ group.setOptimized();
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("group", group);
+ }
+ }
+
+ /**
+ * O_EXPR.
+ */
+ private class OptimizeMExpr implements Task {
+ private final RelNode mExpr;
+ private final RelSubset group;
+ private final boolean explore;
+
+ OptimizeMExpr(RelNode mExpr,
+ RelSubset group, boolean explore) {
+ this.mExpr = mExpr;
+ this.group = group;
+ this.explore = explore;
+ }
+
+ @Override public void perform() {
+ if (explore && group.isExplored()) {
+ return;
+ }
+ // 1. explode input
+ // 2. apply other rules
+ tasks.push(new ApplyRules(mExpr, group, explore));
+ for (int i = mExpr.getInputs().size() - 1; i >= 0; --i) {
+ tasks.push(new ExploreInput(mExpr, i));
+ }
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("mExpr", mExpr).item("explore", explore);
+ }
+ }
+
+ /**
+ * Ensure ExploreInput are working on the correct input group since calcite
+ * may merge sets.
+ */
+ private class EnsureGroupExplored implements Task {
+
+ private final RelSubset input;
+ private final RelNode parent;
+ private final int inputOrdinal;
+
+ EnsureGroupExplored(RelSubset input, RelNode parent, int inputOrdinal) {
+ this.input = input;
+ this.parent = parent;
+ this.inputOrdinal = inputOrdinal;
+ }
+
+ @Override public void perform() {
+ if (parent.getInput(inputOrdinal) != input) {
+ tasks.push(new ExploreInput(parent, inputOrdinal));
+ return;
+ }
+ input.setExplored();
+ for (RelSubset subset : input.getSet().subsets) {
+ // clear the LB cache as exploring state have changed
+ input.getCluster().getMetadataQuery().clearCache(subset);
+ }
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("mExpr", parent).item("i", inputOrdinal);
+ }
+ }
+
+ /**
+ * E_GROUP.
+ */
+ private class ExploreInput implements Task {
+ private final RelSubset group;
+ private final RelNode parent;
+ private final int inputOrdinal;
+
+ ExploreInput(RelNode parent, int inputOrdinal) {
+ this.group = (RelSubset) parent.getInput(inputOrdinal);
+ this.parent = parent;
+ this.inputOrdinal = inputOrdinal;
+ }
+
+ @Override public void perform() {
+ if (!group.explore()) {
+ return;
+ }
+ tasks.push(new EnsureGroupExplored(group, parent, inputOrdinal));
+ for (RelNode rel : group.set.rels) {
+ if (planner.isLogical(rel)) {
+ tasks.push(new OptimizeMExpr(rel, group, true));
+ }
+ }
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("group", group);
+ }
+ }
+
+ /**
+ * Extract rule matches from rule queue and add them to task stack.
+ */
+ private class ApplyRules implements Task {
+ private final RelNode mExpr;
+ private final RelSubset group;
+ private final boolean exploring;
+
+ ApplyRules(RelNode mExpr, RelSubset group, boolean exploring) {
+ this.mExpr = mExpr;
+ this.group = group;
+ this.exploring = exploring;
+ }
+
+ @Override public void perform() {
+ Pair<RelNode, Predicate<VolcanoRuleMatch>> category =
+ exploring ? Pair.of(mExpr, planner::isTransformationRule)
+ : Pair.of(mExpr, m -> true);
+ VolcanoRuleMatch match = ruleQueue.popMatch(category);
+ while (match != null) {
+ tasks.push(new ApplyRule(match, group, exploring));
+ match = ruleQueue.popMatch(category);
+ }
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("mExpr", mExpr).item("exploring", exploring);
+ }
+ }
+
+ /**
+ * APPLY_RULE.
+ */
+ private class ApplyRule implements GeneratorTask {
+ private final VolcanoRuleMatch match;
+ private final RelSubset group;
+ private final boolean exploring;
+
+ ApplyRule(VolcanoRuleMatch match, RelSubset group, boolean exploring) {
+ this.match = match;
+ this.group = group;
+ this.exploring = exploring;
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("match", match).item("exploring", exploring);
+ }
+
+ @Override public void perform() {
+ applyGenerator(this, match::onMatch);
+ }
+
+ @Override public RelSubset group() {
+ return group;
+ }
+
+ @Override public boolean exploring() {
+ return exploring;
+ }
+ }
+
+ private Task getOptimizeInputTask(RelNode rel, RelSubset group) {
+ if (!rel.getTraitSet().satisfies(group.getTraitSet())) {
+ RelNode passThroughRel = convert(rel, group);
+ if (passThroughRel == null) {
+ LOGGER.debug("Skip optimizing because of traits: {}", rel);
+ return null;
+ }
+ final RelNode finalPassThroughRel = passThroughRel;
+ applyGenerator(null, () ->
+ planner.register(finalPassThroughRel, group));
+ rel = passThroughRel;
+ }
+ boolean unProcess = false;
+ for (RelNode input : rel.getInputs()) {
+ RelOptCost winner = ((RelSubset) input).getWinnerCost();
+ if (winner == null) {
+ unProcess = true;
+ break;
+ }
+ }
+ if (!unProcess) {
+ return new DeriveTrait(rel, group);
+ }
+ if (rel.getInputs().size() == 1) {
+ return new OptimizeInput1(rel, group);
+ }
+ return new OptimizeInputs(rel, group);
+ }
+
+ private RelNode convert(RelNode rel, RelSubset group) {
+ if (!passThroughCache.contains(rel)) {
+ if (checkLowerBound(rel, group)) {
+ RelNode passThrough = group.passThrough(rel);
+ if (passThrough != null) {
+ passThroughCache.add(passThrough);
+ return passThrough;
+ }
+ } else {
+ LOGGER.debug("Skip pass though because of lower bound. LB = {}, UP = {}",
+ rel, group.upperBound);
+ }
+ }
+ VolcanoRuleMatch match = ruleQueue.popMatch(
+ Pair.of(rel,
+ m -> m.getRule() instanceof ConverterRule
+ && m.getRule().getOutTrait().satisfies(group.getTraitSet().getConvention())));
+ if (match != null) {
+ tasks.add(new ApplyRule(match, group, false));
+ }
+ return null;
+ }
+
+ private boolean checkLowerBound(RelNode rel, RelSubset group) {
+ RelOptCost upperBound = group.upperBound;
+ if (upperBound.isInfinite()) {
+ return true;
+ }
+ RelOptCost lb = planner.getLowerBound(rel);
+ return !upperBound.isLe(lb);
+ }
+
+ /**
+ * O_INPUT when there is only one input.
+ */
+ private class OptimizeInput1 implements Task {
+
+ private final RelNode mExpr;
+ private final RelSubset group;
+
+ OptimizeInput1(RelNode mExpr, RelSubset group) {
+ this.mExpr = mExpr;
+ this.group = group;
+ }
+
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("mExpr", mExpr).item("upperBound", group.upperBound);
+ }
+
+ @Override public void perform() {
+ RelOptCost upperBound = group.upperBound;
+ RelOptCost upperForInput = planner.upperBoundForInputs(mExpr, upperBound);
+ if (upperForInput.isLe(planner.zeroCost)) {
+ LOGGER.debug(
+ "Skip O_INPUT because of lower bound. UB4Inputs = {}, UB = {}",
+ upperForInput, upperBound);
+ return;
+ }
+
+ RelSubset input = (RelSubset) mExpr.getInput(0);
+
+ // Apply enforcing rules
+ tasks.push(new DeriveTrait(mExpr, group));
+
+ tasks.push(new CheckInput(null, mExpr, input, 0, upperForInput));
+ tasks.push(new OptimizeGroup(input, upperForInput));
+ }
+ }
+
+ /**
+ * O_INPUT.
+ */
+ private class OptimizeInputs implements Task {
+
+ private final RelNode mExpr;
+ private final RelSubset group;
+ private final int childCount;
+ private RelOptCost upperBound;
+ private RelOptCost upperForInput;
+ private int processingChild;
+
+ OptimizeInputs(RelNode rel, RelSubset group) {
+ this.mExpr = rel;
+ this.group = group;
+ this.upperBound = group.upperBound;
+ this.upperForInput = planner.infCost;
+ this.childCount = rel.getInputs().size();
+ this.processingChild = 0;
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("mExpr", mExpr).item("upperBound", upperBound)
+ .item("processingChild", processingChild);
+ }
+
+ private List<RelOptCost> lowerBounds;
+ private RelOptCost lowerBoundSum;
+ @Override public void perform() {
+ RelOptCost bestCost = group.bestCost;
+ if (!bestCost.isInfinite()) {
+ if (bestCost.isLt(upperBound)) {
+ upperBound = bestCost;
+ upperForInput = planner.upperBoundForInputs(mExpr, upperBound);
+ }
+
+ if (lowerBoundSum == null) {
+ if (upperForInput.isInfinite()) {
+ upperForInput = planner.upperBoundForInputs(mExpr, upperBound);
+ }
+ lowerBounds = new ArrayList<>(childCount);
+ for (RelNode input : mExpr.getInputs()) {
+ RelOptCost lb = planner.getLowerBound(input);
+ lowerBounds.add(lb);
+ lowerBoundSum = lowerBoundSum == null ? lb : lowerBoundSum.plus(lb);
+ }
+ }
+ if (upperForInput.isLt(lowerBoundSum)) {
+ LOGGER.debug(
+ "Skip O_INPUT because of lower bound. LB = {}, UP = {}",
+ lowerBoundSum, upperForInput);
+ return; // group pruned
+ }
+ }
+
+ if (lowerBoundSum != null && lowerBoundSum.isInfinite()) {
+ LOGGER.debug("Skip O_INPUT as one of the inputs fail to optimize");
+ return;
+ }
+
+ if (processingChild == 0) {
+ // Apply enforcing rules
+ tasks.push(new DeriveTrait(mExpr, group));
+ }
+
+ while (processingChild < childCount) {
+ RelSubset input =
+ (RelSubset) mExpr.getInput(processingChild);
+
+ RelOptCost winner = input.getWinnerCost();
+ if (winner != null) {
+ ++ processingChild;
+ continue;
+ }
+
+ RelOptCost upper = upperForInput;
+ if (!upper.isInfinite()) {
+ upper = upperForInput.minus(lowerBoundSum)
+ .plus(lowerBounds.get(processingChild));
+ }
+ if (input.taskState != null && upper.isLe(input.upperBound)) {
+ return;
+ }
+
+ if (processingChild != childCount - 1) {
+ tasks.push(this);
+ }
+ tasks.push(new CheckInput(this, mExpr, input, processingChild, upper));
+ tasks.push(new OptimizeGroup(input, upper));
+ ++ processingChild;
+ break;
+ }
+ }
+ }
+
+ /**
+ * Ensure input is optimized correctly and modify context.
+ */
+ private class CheckInput implements Task {
+
+ private final OptimizeInputs context;
+ private final RelOptCost upper;
+ private final RelNode parent;
+ private RelSubset input;
+ private final int i;
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("parent", parent).item("i", i);
+ }
+
+ CheckInput(OptimizeInputs context,
+ RelNode parent, RelSubset input, int i, RelOptCost upper) {
+ this.context = context;
+ this.parent = parent;
+ this.input = input;
+ this.i = i;
+ this.upper = upper;
+ }
+
+ @Override public void perform() {
+ if (input != parent.getInput(i)) {
+ input = (RelSubset) parent.getInput(i);
+ tasks.push(this);
+ tasks.push(new OptimizeGroup(input, upper));
+ return;
+ }
+ if (context == null) {
+ return;
+ }
+ RelOptCost winner = input.getWinnerCost();
+ if (winner == null) {
+ // the input fail to optimize due to group pruning
+ context.lowerBoundSum = planner.infCost;
+ return;
+ }
+ if (context.lowerBoundSum != null && context.lowerBoundSum != planner.infCost) {
+ context.lowerBoundSum = context.
+ lowerBoundSum.minus(context.lowerBounds.get(i));
+ context.lowerBoundSum = context.lowerBoundSum.plus(winner);
+ context.lowerBounds.set(i, winner);
+ }
+ }
+ }
+
+ private class DeriveTrait implements GeneratorTask {
+
+ private final RelNode mExpr;
+ private final RelSubset group;
+
+ DeriveTrait(RelNode mExpr, RelSubset group) {
+ this.mExpr = mExpr;
+ this.group = group;
+ }
+
+ @Override public void perform() {
+ List<RelNode> inputs = mExpr.getInputs();
+ for (RelNode input : inputs) {
+ if (((RelSubset) input).getWinnerCost() == null) {
+ // fail to optimize input, then no need to deliver traits
+ return;
+ }
+ }
+ tasks.push(new ApplyRules(mExpr, group, false));
+ if (!passThroughCache.contains(mExpr)) {
+ applyGenerator(this, this::derive);
+ }
+ }
+
+ private void derive() {
+ if (!(mExpr instanceof PhysicalNode)
+ || ((PhysicalNode) mExpr).getDeriveMode() == DeriveMode.PROHIBITED) {
+ return;
+ }
+
+ PhysicalNode rel = (PhysicalNode) mExpr;
+ DeriveMode mode = rel.getDeriveMode();
+ int arity = rel.getInputs().size();
+ // for OMAKASE
+ List<List<RelTraitSet>> inputTraits = new ArrayList<>(arity);
+
+ for (int i = 0; i < arity; i++) {
+ int childId = i;
+ if (mode == DeriveMode.RIGHT_FIRST) {
+ childId = arity - i - 1;
+ }
+
+ RelSubset input = (RelSubset) rel.getInput(childId);
+ List<RelTraitSet> traits = new ArrayList<>();
+ inputTraits.add(traits);
+
+ final int numSubset = input.set.subsets.size();
+ for (int j = 0; j < numSubset; j++) {
+ RelSubset subset = input.set.subsets.get(j);
+ if (!subset.isDelivered() || equalsSansConvention(
+ subset.getTraitSet(), rel.getCluster().traitSet())) {
+ // Ideally we should stop deriving new relnodes when the
+ // subset's traitSet equals with input traitSet, but
+ // in case someone manually builds a physical relnode
+ // tree, which is highly discouraged, without specifying
+ // correct traitSet, e.g.
+ // EnumerableFilter [].ANY
+ // -> EnumerableMergeJoin [a].Hash[a]
+ // We should still be able to derive the correct traitSet
+ // for the dumb filter, even though the filter's traitSet
+ // should be derived from the MergeJoin when it is created.
+ // But if the subset's traitSet equals with the default
+ // empty traitSet sans convention (the default traitSet
+ // from cluster may have logical convention, NONE, which
+ // is not interesting), we are safe to ignore it, because
+ // a physical filter with non default traitSet, but has a
+ // input with default empty traitSet, e.g.
+ // EnumerableFilter [a].Hash[a]
+ // -> EnumerableProject [].ANY
+ // is definitely wrong, we should fail fast.
+ continue;
+ }
+
+ if (mode == DeriveMode.OMAKASE) {
+ traits.add(subset.getTraitSet());
+ } else {
+ RelNode newRel = rel.derive(subset.getTraitSet(), childId);
+ if (newRel != null && !planner.isRegistered(newRel)) {
+ RelNode newInput = newRel.getInput(childId);
+ assert newInput instanceof RelSubset;
+ if (newInput == subset) {
+ // If the child subset is used to derive new traits for
+ // current relnode, the subset will be marked REQUIRED
+ // when registering the new derived relnode and later
+ // will add enforcers between other delivered subsets.
+ // e.g. a MergeJoin request both inputs hash distributed
+ // by [a,b] sorted by [a,b]. If the left input R1 happens to
+ // be distributed by [a], the MergeJoin can derive new
+ // traits from this input and request both input to be
+ // distributed by [a] sorted by [a,b]. In case there is a
+ // alternative R2 with ANY distribution in the left input's
+ // RelSet, we may end up with requesting hash distribution
+ // [a] on alternative R2, which is unnecessary and waste,
+ // because we request distribution by [a] because of R1 can
+ // deliver the exact same distribution and we don't need to
+ // enforce properties on other subsets that can't satisfy
+ // the specific trait requirement.
+ // Here we add a constraint that {@code newInput == subset},
+ // because if the delivered child subset is HASH[a], but
+ // we require HASH[a].SORT[a,b], we still need to enable
+ // property enforcement on the required subset. Otherwise,
+ // we need to restrict enforcement between HASH[a].SORT[a,b]
+ // and HASH[a] only, which will make things a little bit
+ // complicated. We might optimize it in the future.
+ subset.disableEnforcing();
+ }
+ RelSubset relSubset = planner.register(newRel, rel);
+ assert relSubset.set == planner.getSubset(rel).set;
+ }
+ }
+ }
+
+ if (mode == DeriveMode.LEFT_FIRST
+ || mode == DeriveMode.RIGHT_FIRST) {
+ break;
+ }
+ }
+
+ if (mode == DeriveMode.OMAKASE) {
+ List<RelNode> relList = rel.derive(inputTraits);
+ for (RelNode relNode : relList) {
+ if (!planner.isRegistered(relNode)) {
+ planner.register(relNode, rel);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Returns whether the 2 traitSets are equal without Convention.
+ * It assumes they have the same traitDefs order.
+ */
+ private boolean equalsSansConvention(RelTraitSet ts1, RelTraitSet ts2) {
+ assert ts1.size() == ts2.size();
+ for (int i = 0; i < ts1.size(); i++) {
+ RelTrait trait = ts1.getTrait(i);
+ if (trait.getTraitDef() == ConventionTraitDef.INSTANCE) {
+ continue;
+ }
+ if (!trait.equals(ts2.getTrait(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override public void describe(TaskDescriptor desc) {
+ desc.item("mExpr", mExpr).item("group", group);
+ }
+
+ @Override public RelSubset group() {
+ return group;
+ }
+
+ @Override public boolean exploring() {
+ return false;
+ }
+
+ @Override public boolean onProduce(RelNode node) {
+ passThroughCache.add(node);
+ return true;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleQueue.java b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleQueue.java
new file mode 100644
index 0000000..2c142c1
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleQueue.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.Pair;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * A rule queue that manage rule matches for cascade planner.
+ */
+class TopDownRuleQueue extends RuleQueue {
+
+ private final Map<RelNode, List<VolcanoRuleMatch>> matches = new HashMap<>();
+
+ private final Set<String> names = new HashSet<>();
+
+ TopDownRuleQueue(VolcanoPlanner planner) {
+ super(planner);
+ }
+
+ public void addMatch(VolcanoRuleMatch match) {
+ RelNode rel = match.rel(0);
+ List<VolcanoRuleMatch> queue = matches.
+ computeIfAbsent(rel, id -> new LinkedList<>());
+ addMatch(match, queue);
+ }
+
+ private void addMatch(VolcanoRuleMatch match, List<VolcanoRuleMatch> queue) {
+ if (!names.add(match.toString())) {
+ return;
+ }
+
+ if (!planner.isSubstituteRule(match)) {
+ queue.add(0, match);
+ } else {
+ queue.add(match);
+ }
+ }
+
+ public VolcanoRuleMatch popMatch(Pair<RelNode, Predicate<VolcanoRuleMatch>> category) {
+ List<VolcanoRuleMatch> queue = matches.get(category.left);
+ if (queue == null) {
+ return null;
+ }
+ Iterator<VolcanoRuleMatch> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ VolcanoRuleMatch next = iterator.next();
+ if (category.right != null && !category.right.test(next)) {
+ continue;
+ }
+ iterator.remove();
+ if (!skipMatch(next)) {
+ return next;
+ }
+ }
+ return null;
+ }
+
+ @Override public boolean clear() {
+ boolean empty = matches.isEmpty();
+ matches.clear();
+ names.clear();
+ return !empty;
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
index 54615db..e389863 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
@@ -48,6 +48,7 @@ import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdUtil;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.SubstitutionRule;
import org.apache.calcite.rel.rules.TransformationRule;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.SqlExplainLevel;
@@ -59,6 +60,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
+import org.apiguardian.api.API;
+
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayDeque;
@@ -133,9 +136,9 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
private final Set<RelOptSchema> registeredSchemas = new HashSet<>();
/**
- * Holds rule calls waiting to be fired.
+ * A driver to manage rule and rule matches.
*/
- final RuleQueue ruleQueue = new RuleQueue(this);
+ RuleDriver ruleDriver;
/**
* Holds the currently registered RelTraitDefs.
@@ -146,6 +149,8 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
private RelNode originalRoot;
+ private Convention rootConvention;
+
/**
* Whether the planner can accept new rules.
*/
@@ -171,22 +176,21 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
/** Zero cost, according to {@link #costFactory}. Not necessarily a
* {@link org.apache.calcite.plan.volcano.VolcanoCost}. */
- private final RelOptCost zeroCost;
+ final RelOptCost zeroCost;
- /**
- * Optimization tasks including trait propagation, enforcement.
- */
- final Deque<OptimizeTask> tasks = new ArrayDeque<>();
+ /** Infinite cost, according to {@link #costFactory}. Not necessarily a
+ * {@link org.apache.calcite.plan.volcano.VolcanoCost}. */
+ final RelOptCost infCost;
/**
- * The id generator for optimization tasks.
+ * Whether to enable top-down optimization or not.
*/
- int nextTaskId = 0;
+ boolean topDownOpt = CalciteSystemProperty.TOPDOWN_OPT.value();
/**
- * Whether to enable top-down optimization or not.
+ * Extra roots for explorations.
*/
- boolean topDownOpt = CalciteSystemProperty.TOPDOWN_OPT.value();
+ Set<RelSubset> explorationRoots = new HashSet<>();
//~ Constructors -----------------------------------------------------------
@@ -216,9 +220,19 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
super(costFactory == null ? VolcanoCost.FACTORY : costFactory,
externalContext);
this.zeroCost = this.costFactory.makeZeroCost();
+ this.infCost = this.costFactory.makeInfiniteCost();
// If LOGGER is debug enabled, enable provenance information to be captured
this.provenanceMap = LOGGER.isDebugEnabled() ? new HashMap<>()
: Util.blackholeMap();
+ initRuleQueue();
+ }
+
+ private void initRuleQueue() {
+ if (topDownOpt) {
+ ruleDriver = new TopDownRuleDriver(this);
+ } else {
+ ruleDriver = new IterativeRuleDriver(this);
+ }
}
//~ Methods ----------------------------------------------------------------
@@ -236,11 +250,15 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
/**
* Enable or disable top-down optimization.
*
- * <p>Note: Enabling top-down optimization will automatically disable
- * the use of AbstractConverter and related rules.</p>
+ * <p>Note: Enabling top-down optimization will automatically enable
+ * top-down trait propagation.</p>
*/
public void setTopDownOpt(boolean value) {
+ if (topDownOpt == value) {
+ return;
+ }
topDownOpt = value;
+ initRuleQueue();
}
// implement RelOptPlanner
@@ -259,6 +277,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
this.originalRoot = rel;
}
+ rootConvention = this.root.getConvention();
ensureRootConverters();
}
@@ -311,6 +330,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
}
for (RelOptMaterialization materialization : applicableMaterializations) {
RelSubset subset = registerImpl(materialization.queryRel, null);
+ explorationRoots.add(subset);
RelNode tableRel2 =
RelOptUtil.createCastRel(
materialization.tableRel,
@@ -382,7 +402,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
this.mapDigestToRel.clear();
this.mapRel2Subset.clear();
this.prunedNodes.clear();
- this.ruleQueue.clear();
+ this.ruleDriver.clear();
this.materializations.clear();
this.latticeByName.clear();
this.provenanceMap.clear();
@@ -490,13 +510,6 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
* Finds the most efficient expression to implement the query given via
* {@link org.apache.calcite.plan.RelOptPlanner#setRoot(org.apache.calcite.rel.RelNode)}.
*
- * <p>The algorithm executes repeatedly in a series of phases. In each phase
- * the exact rules that may be fired varies. The mapping of phases to rule
- * sets is maintained in the {@link #ruleQueue}.
- *
- * <p>In each phase, the planner then iterates over the rule matches presented
- * by the rule queue until the rule queue becomes empty.
- *
* @return the most efficient RelNode tree found for implementing the given
* query
*/
@@ -504,54 +517,14 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
ensureRootConverters();
registerMaterializations();
- PLANNING:
- for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
- while (true) {
- LOGGER.debug("PLANNER = {}; PHASE = {}; COST = {}",
- this, phase.toString(), root.bestCost);
-
- VolcanoRuleMatch match = ruleQueue.popMatch(phase);
- if (match == null) {
- break;
- }
-
- assert match.getRule().matches(match);
- try {
- match.onMatch();
- } catch (VolcanoTimeoutException e) {
- LOGGER.warn("Volcano planning times out, cancels the subsequent optimization.");
- root = canonize(root);
- ruleQueue.phaseCompleted(phase);
- break PLANNING;
- }
-
- // The root may have been merged with another
- // subset. Find the new root subset.
- root = canonize(root);
- }
-
- ruleQueue.phaseCompleted(phase);
- }
-
- if (topDownOpt) {
- tasks.push(OptimizeTask.create(root));
- while (!tasks.isEmpty()) {
- OptimizeTask task = tasks.peek();
- if (task.hasSubTask()) {
- tasks.push(task.nextSubTask());
- continue;
- }
- task = tasks.pop();
- task.execute();
- }
- }
+ ruleDriver.drive();
if (LOGGER.isTraceEnabled()) {
StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
dump(pw);
pw.flush();
- LOGGER.trace(sw.toString());
+ LOGGER.info(sw.toString());
}
dumpRuleAttemptsInfo();
RelNode cheapest = root.buildCheapestPlan(this);
@@ -776,11 +749,6 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
return set.getSubset(traits);
}
- boolean isSeedNode(RelNode node) {
- final RelSet set = getSubset(node).set;
- return set.seeds.contains(node);
- }
-
RelNode changeTraitsUsingConverters(
RelNode rel,
RelTraitSet toTraits) {
@@ -987,6 +955,13 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
}
/**
+ * Find the new root subset in case the root is merged with another subset
+ */
+ void canonize() {
+ root = canonize(root);
+ }
+
+ /**
* If a subset has one or more equivalent subsets (owing to a set having
* merged with another), returns the subset which is the leader of the
* equivalence class.
@@ -1088,6 +1063,9 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
ensureRootConverters();
}
+ if (ruleDriver != null) {
+ ruleDriver.onSetMerged(set);
+ }
return set;
}
@@ -1258,8 +1236,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
final int subsetBeforeCount = set.subsets.size();
RelSubset subset = addRelToSet(rel, set);
- final RelNode xx = mapDigestToRel.put(digest, rel);
- assert xx == null || xx == rel : rel.getDigest();
+ final RelNode xx = mapDigestToRel.putIfAbsent(digest, rel);
LOGGER.trace("Register {} in {}", rel, subset);
@@ -1303,6 +1280,10 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
// ignore
}
+ if (ruleDriver != null) {
+ ruleDriver.onProduce(rel, subset);
+ }
+
return subset;
}
@@ -1397,6 +1378,79 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
this.locked = locked;
}
+ /**
+ * Decide whether a rule is logical or not.
+ * @param rel The specific rel node
+ * @return True if the relnode is a logical node
+ */
+ @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+ public boolean isLogical(RelNode rel) {
+ return !(rel instanceof PhysicalNode)
+ && rel.getConvention() != rootConvention;
+ }
+
+ /**
+ * Check whether a rule match is a substitute rule match.
+ * @param match The rule match to check
+ * @return True if the rule match is a substitute rule match
+ */
+ @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+ protected boolean isSubstituteRule(VolcanoRuleCall match) {
+ return match.getRule() instanceof SubstitutionRule;
+ }
+
+ /**
+ * Check whether a rule match is a transformation rule match.
+ * @param match The rule match to check
+ * @return True if the rule match is a transformation rule match
+ */
+ @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+ protected boolean isTransformationRule(VolcanoRuleCall match) {
+ if (match.getRule() instanceof SubstitutionRule) {
+ return true;
+ }
+ if (match.getRule() instanceof ConverterRule
+ && match.getRule().getOutTrait() == rootConvention) {
+ return false;
+ }
+ return match.getRule().getOperand().trait == Convention.NONE
+ || match.getRule().getOperand().trait == null;
+ }
+
+
+ /**
+ * Gets the lower bound cost of a relational operator.
+ * @param rel The rel node
+ * @return The lower bound cost of the given rel. The value is ensured NOT NULL.
+ */
+ @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+ protected RelOptCost getLowerBound(RelNode rel) {
+ RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
+ RelOptCost lowerBound = mq.getLowerBoundCost(rel, this);
+ if (lowerBound == null) {
+ return zeroCost;
+ }
+ return lowerBound;
+ }
+
+ /**
+ * Gets the upper bound of its inputs.
+ * Allow users to overwrite this method as some implementations may have
+ * different cost model on some RelNodes, like Spool.
+ */
+ @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+ protected RelOptCost upperBoundForInputs(
+ RelNode mExpr, RelOptCost upperBound) {
+ if (!upperBound.isInfinite()) {
+ RelOptCost rootCost = mExpr.getCluster()
+ .getMetadataQuery().getNonCumulativeCost(mExpr);
+ if (!rootCost.isInfinite()) {
+ return upperBound.minus(rootCost);
+ }
+ }
+ return upperBound;
+ }
+
//~ Inner Classes ----------------------------------------------------------
/**
@@ -1422,7 +1476,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
getOperand0(),
rels,
nodeInputs);
- volcanoPlanner.ruleQueue.addMatch(match);
+ volcanoPlanner.ruleDriver.getRuleQueue().addMatch(match);
}
}
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java b/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java
index 4cb3ba4..aada556 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java
@@ -17,8 +17,8 @@
package org.apache.calcite.rel.metadata;
import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
@@ -622,6 +622,21 @@ public abstract class BuiltInMetadata {
}
}
+ /** Metadata to get the lower bound cost of a RelNode. */
+ public interface LowerBoundCost extends Metadata {
+ MetadataDef<LowerBoundCost> DEF = MetadataDef.of(LowerBoundCost.class,
+ LowerBoundCost.Handler.class, BuiltInMethod.LOWER_BOUND_COST.method);
+
+ /** Returns the lower bound cost of a RelNode. */
+ RelOptCost getLowerBoundCost(VolcanoPlanner planner);
+
+ /** Handler API. */
+ interface Handler extends MetadataHandler<LowerBoundCost> {
+ RelOptCost getLowerBoundCost(
+ RelNode r, RelMetadataQuery mq, VolcanoPlanner planner);
+ }
+ }
+
/** Metadata about the memory use of an operator. */
public interface Memory extends Metadata {
MetadataDef<Memory> DEF = MetadataDef.of(Memory.class,
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java b/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java
index 4631092..2729ab2 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java
@@ -55,6 +55,7 @@ public class DefaultRelMetadataProvider extends ChainedRelMetadataProvider {
RelMdSize.SOURCE,
RelMdParallelism.SOURCE,
RelMdDistribution.SOURCE,
+ RelMdLowerBoundCost.SOURCE,
RelMdMemory.SOURCE,
RelMdDistinctRowCount.SOURCE,
RelMdSelectivity.SOURCE,
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdLowerBoundCost.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdLowerBoundCost.java
new file mode 100644
index 0000000..f7ea60a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdLowerBoundCost.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.metadata;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.BuiltInMetadata.LowerBoundCost;
+import org.apache.calcite.util.BuiltInMethod;
+
+/**
+ * Default implementations of the
+ * {@link BuiltInMetadata.LowerBoundCost}
+ * metadata provider for the standard algebra.
+ */
+public class RelMdLowerBoundCost implements MetadataHandler<LowerBoundCost> {
+
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ new RelMdLowerBoundCost(), BuiltInMethod.LOWER_BOUND_COST.method);
+
+ //~ Constructors -----------------------------------------------------------
+
+ protected RelMdLowerBoundCost() {}
+
+ //~ Methods ----------------------------------------------------------------
+
+ public MetadataDef<LowerBoundCost> getDef() {
+ return BuiltInMetadata.LowerBoundCost.DEF;
+ }
+
+ public RelOptCost getLowerBoundCost(RelSubset subset,
+ RelMetadataQuery mq, VolcanoPlanner planner) {
+
+ if (planner.isLogical(subset)) {
+ // currently only support physical, will improve in the future
+ return null;
+ }
+
+ return subset.getWinnerCost();
+ }
+
+ public RelOptCost getLowerBoundCost(RelNode node,
+ RelMetadataQuery mq, VolcanoPlanner planner) {
+ if (planner.isLogical(node)) {
+ // currently only support physical, will improve in the future
+ return null;
+ }
+
+ RelOptCost selfCost = mq.getNonCumulativeCost(node);
+ if (selfCost.isInfinite()) {
+ selfCost = null;
+ }
+ for (RelNode input : node.getInputs()) {
+ RelOptCost lb = mq.getLowerBoundCost(input, planner);
+ if (lb != null) {
+ selfCost = selfCost == null ? lb : selfCost.plus(lb);
+ }
+ }
+ return selfCost;
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java
index aadc8e5..f27b085 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java
@@ -19,6 +19,7 @@ package org.apache.calcite.rel.metadata;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPredicateList;
import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
@@ -97,6 +98,7 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
private BuiltInMetadata.Selectivity.Handler selectivityHandler;
private BuiltInMetadata.Size.Handler sizeHandler;
private BuiltInMetadata.UniqueKeys.Handler uniqueKeysHandler;
+ private BuiltInMetadata.LowerBoundCost.Handler lowerBoundCostHandler;
/**
* Creates the instance with {@link JaninoRelMetadataProvider} instance
@@ -134,6 +136,7 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
this.selectivityHandler = initialHandler(BuiltInMetadata.Selectivity.Handler.class);
this.sizeHandler = initialHandler(BuiltInMetadata.Size.Handler.class);
this.uniqueKeysHandler = initialHandler(BuiltInMetadata.UniqueKeys.Handler.class);
+ this.lowerBoundCostHandler = initialHandler(BuiltInMetadata.LowerBoundCost.Handler.class);
}
private RelMetadataQuery(JaninoRelMetadataProvider metadataProvider,
@@ -162,6 +165,7 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
this.selectivityHandler = prototype.selectivityHandler;
this.sizeHandler = prototype.sizeHandler;
this.uniqueKeysHandler = prototype.uniqueKeysHandler;
+ this.lowerBoundCostHandler = prototype.lowerBoundCostHandler;
}
//~ Methods ----------------------------------------------------------------
@@ -843,4 +847,18 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
}
}
}
+
+ /**
+ * Returns the lower bound cost of a RelNode.
+ */
+ public RelOptCost getLowerBoundCost(RelNode rel, VolcanoPlanner planner) {
+ for (;;) {
+ try {
+ return lowerBoundCostHandler.getLowerBoundCost(rel, this, planner);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ lowerBoundCostHandler =
+ revise(e.relClass, BuiltInMetadata.LowerBoundCost.DEF);
+ }
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/calcite/tools/Programs.java b/core/src/main/java/org/apache/calcite/tools/Programs.java
index 2432a45..7626fa8 100644
--- a/core/src/main/java/org/apache/calcite/tools/Programs.java
+++ b/core/src/main/java/org/apache/calcite/tools/Programs.java
@@ -251,8 +251,6 @@ public class Programs {
public static Program standard(RelMetadataProvider metadataProvider) {
final Program program1 =
(planner, rel, requiredOutputTraits, materializations, lattices) -> {
- planner.setRoot(rel);
-
for (RelOptMaterialization materialization : materializations) {
planner.addMaterialization(materialization);
}
@@ -260,6 +258,7 @@ public class Programs {
planner.addLattice(lattice);
}
+ planner.setRoot(rel);
final RelNode rootRel2 =
rel.getTraitSet().equals(requiredOutputTraits)
? rel
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 4f92c5f..e0ca291 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -51,6 +51,7 @@ import org.apache.calcite.linq4j.function.Predicate2;
import org.apache.calcite.linq4j.tree.FunctionExpression;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.metadata.BuiltInMetadata.AllPredicates;
import org.apache.calcite.rel.metadata.BuiltInMetadata.Collation;
import org.apache.calcite.rel.metadata.BuiltInMetadata.ColumnOrigin;
@@ -60,6 +61,7 @@ import org.apache.calcite.rel.metadata.BuiltInMetadata.DistinctRowCount;
import org.apache.calcite.rel.metadata.BuiltInMetadata.Distribution;
import org.apache.calcite.rel.metadata.BuiltInMetadata.ExplainVisibility;
import org.apache.calcite.rel.metadata.BuiltInMetadata.ExpressionLineage;
+import org.apache.calcite.rel.metadata.BuiltInMetadata.LowerBoundCost;
import org.apache.calcite.rel.metadata.BuiltInMetadata.MaxRowCount;
import org.apache.calcite.rel.metadata.BuiltInMetadata.Memory;
import org.apache.calcite.rel.metadata.BuiltInMetadata.MinRowCount;
@@ -553,6 +555,8 @@ public enum BuiltInMethod {
AVERAGE_COLUMN_SIZES(Size.class, "averageColumnSizes"),
IS_PHASE_TRANSITION(Parallelism.class, "isPhaseTransition"),
SPLIT_COUNT(Parallelism.class, "splitCount"),
+ LOWER_BOUND_COST(LowerBoundCost.class, "getLowerBoundCost",
+ VolcanoPlanner.class),
MEMORY(Memory.class, "memory"),
CUMULATIVE_MEMORY_WITHIN_PHASE(Memory.class, "cumulativeMemoryWithinPhase"),
CUMULATIVE_MEMORY_WITHIN_PHASE_SPLIT(Memory.class,
diff --git a/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java b/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java
index 8f200ec..9d1371f 100644
--- a/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java
@@ -37,6 +37,7 @@ import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Pair;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -64,7 +65,7 @@ import static org.apache.calcite.test.Matchers.isLinux;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -501,12 +502,18 @@ class VolcanoPlannerTest {
// verify that the rule match cannot be popped,
// as the related node has been pruned
+ RuleQueue ruleQueue = planner.ruleDriver.getRuleQueue();
while (true) {
- VolcanoRuleMatch ruleMatch = planner.ruleQueue.popMatch(VolcanoPlannerPhase.OPTIMIZE);
+ VolcanoRuleMatch ruleMatch;
+ if (ruleQueue instanceof IterativeRuleQueue) {
+ ruleMatch = ((IterativeRuleQueue) ruleQueue).popMatch(VolcanoPlannerPhase.OPTIMIZE);
+ } else {
+ ruleMatch = ((TopDownRuleQueue) ruleQueue).popMatch(Pair.of(leafRel, null));
+ }
if (ruleMatch == null) {
break;
}
- assertFalse(ruleMatch.rels[0] == leafRel);
+ assertNotSame(leafRel, ruleMatch.rels[0]);
}
}
diff --git a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
index d92a451..2b32b52 100644
--- a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.test;
+import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.test.CalciteAssert.AssertThat;
import org.apache.calcite.test.CalciteAssert.DatabaseInstance;
@@ -179,6 +180,7 @@ class JdbcAdapterTest {
@Test void testPushDownSort() {
CalciteAssert.model(JdbcTest.SCOTT_MODEL)
+ .with(CalciteConnectionProperty.TOPDOWN_OPT.camelName(), false)
.query("select ename\n"
+ "from scott.emp\n"
+ "order by empno")
@@ -211,6 +213,7 @@ class JdbcAdapterTest {
+ "GROUP BY \"JOB\", \"DEPTNO\"\n"
+ "ORDER BY \"DEPTNO\" NULLS LAST, \"JOB\" NULLS LAST";
CalciteAssert.model(JdbcTest.SCOTT_MODEL)
+ .with(CalciteConnectionProperty.TOPDOWN_OPT.camelName(), false)
.query(sql)
.explainContains(explain)
.runs()
diff --git a/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml b/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
index f2507fa..0be66a5 100644
--- a/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
@@ -51,8 +51,8 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])
</Resource>
<Resource name="planAfter">
<![CDATA[
-EnumerableProject(MGR=[$3])
- EnumerableSort(sort0=[$3], dir0=[DESC-nulls-last])
+EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
+ EnumerableProject(MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
]]>
</Resource>
@@ -77,8 +77,8 @@ EnumerableSortedAggregate(group=[{0, 1, 2}])
EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
EnumerableProject(ENAME=[$0], EXPR$1=[*($2, -2)], MGR=[$1])
EnumerableLimit(fetch=[100])
- EnumerableProject(ENAME=[$1], MGR=[$3], SAL=[$5])
- EnumerableSort(sort0=[$1], sort1=[$3], sort2=[$5], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+ EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+ EnumerableProject(ENAME=[$1], MGR=[$3], SAL=[$5])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
]]>
</Resource>
@@ -147,8 +147,8 @@ LogicalSort(sort0=[$1], dir0=[DESC])
</Resource>
<Resource name="planAfter">
<![CDATA[
-EnumerableProject(DEPTNO=[$7], EXPR$1=[CAST($7):FLOAT NOT NULL])
- EnumerableSort(sort0=[$7], dir0=[DESC])
+EnumerableSort(sort0=[$1], dir0=[DESC])
+ EnumerableProject(DEPTNO=[$7], EXPR$1=[CAST($7):FLOAT NOT NULL])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
]]>
</Resource>
@@ -233,8 +233,8 @@ EnumerableProject(ENAME=[$0], JOB=[$1], EXPR$2=[$2], ENAME0=[$3], JOB0=[$4], SAL
EnumerableSortedAggregate(group=[{1, 2}], MAX_SAL=[MAX($5)])
EnumerableSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
- EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
- EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+ EnumerableSort(sort0=[$4], sort1=[$0], dir0=[ASC], dir1=[ASC])
+ EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
</Resource>
@@ -267,8 +267,8 @@ EnumerableProject(ENAME=[$0], JOB=[$1], EXPR$2=[$2], ENAME0=[$3], JOB0=[$4], SAL
EnumerableLimit(fetch=[100])
EnumerableProject(ENAME=[$1], JOB=[$2], SAL=[$5])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
- EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
- EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+ EnumerableSort(sort0=[$4], sort1=[$0], dir0=[ASC], dir1=[ASC])
+ EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
</Resource>
@@ -670,8 +670,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
<Resource name="planAfter">
<![CDATA[
EnumerableHashJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
- EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
- EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+ EnumerableSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
+ EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
EnumerableProject(ACCTNO=[$0], TYPE=[$1])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
@@ -701,8 +701,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
<![CDATA[
EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
EnumerableHashJoin(condition=[AND(=($0, $10), =($3, $11))], joinType=[left])
- EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
- EnumerableSort(sort0=[$1], dir0=[DESC])
+ EnumerableSort(sort0=[$1], dir0=[DESC])
+ EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
]]>
@@ -730,8 +730,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
<Resource name="planAfter">
<![CDATA[
EnumerableHashJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
- EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
- EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+ EnumerableSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
+ EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
EnumerableProject(ACCTNO=[$0], TYPE=[$1])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
@@ -790,8 +790,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
<![CDATA[
EnumerableHashJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])
EnumerableLimit(fetch=[10])
- EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
- EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+ EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+ EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
@@ -819,8 +819,8 @@ LogicalSort(sort0=[$2], dir0=[DESC])
<![CDATA[
EnumerableHashJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])
EnumerableLimit(fetch=[10])
- EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
- EnumerableSort(sort0=[$3], dir0=[DESC])
+ EnumerableSort(sort0=[$2], dir0=[DESC])
+ EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
@@ -849,8 +849,8 @@ LogicalSort(sort0=[$2], dir0=[ASC])
EnumerableSort(sort0=[$2], dir0=[ASC])
EnumerableHashJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])
EnumerableLimit(fetch=[10])
- EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
- EnumerableSort(sort0=[$3], dir0=[DESC])
+ EnumerableSort(sort0=[$2], dir0=[DESC])
+ EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
@@ -878,8 +878,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
<![CDATA[
EnumerableNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner])
EnumerableLimit(fetch=[10])
- EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
- EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+ EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+ EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
@@ -907,8 +907,8 @@ LogicalSort(sort0=[$2], dir0=[ASC])
<![CDATA[
EnumerableNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner])
EnumerableLimit(fetch=[10])
- EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
- EnumerableSort(sort0=[$3], dir0=[ASC])
+ EnumerableSort(sort0=[$2], dir0=[ASC])
+ EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
@@ -937,8 +937,8 @@ LogicalSort(sort0=[$2], dir0=[DESC])
EnumerableSort(sort0=[$2], dir0=[DESC])
EnumerableNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner])
EnumerableLimit(fetch=[10])
- EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
- EnumerableSort(sort0=[$3], dir0=[ASC])
+ EnumerableSort(sort0=[$2], dir0=[ASC])
+ EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
@@ -967,8 +967,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
<![CDATA[
EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
EnumerableNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[left])
- EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
- EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+ EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+ EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
]]>
@@ -997,8 +997,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
<![CDATA[
EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
EnumerableNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[left])
- EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
- EnumerableSort(sort0=[$1], dir0=[DESC])
+ EnumerableSort(sort0=[$1], dir0=[DESC])
+ EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
]]>
@@ -1054,8 +1054,8 @@ LogicalSort(sort0=[$2], sort1=[$3], dir0=[DESC], dir1=[DESC])
</Resource>
<Resource name="planAfter">
<![CDATA[
-EnumerableProject(CONTACTNO=[$0], EMAIL=[$3], ACCTNO=[$10], TYPE=[$11])
- EnumerableSort(sort0=[$10], sort1=[$11], dir0=[DESC], dir1=[DESC])
+EnumerableSort(sort0=[$2], sort1=[$3], dir0=[DESC], dir1=[DESC])
+ EnumerableProject(CONTACTNO=[$0], EMAIL=[$3], ACCTNO=[$10], TYPE=[$11])
EnumerableNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[right])
EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
@@ -1215,8 +1215,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
<![CDATA[
EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
EnumerableBatchNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[left], batchSize=[100])
- EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
- EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+ EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+ EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
EnumerableFilter(condition=[OR(AND(>($cor0.CONTACTNO, $0), <($cor0.EMAIL, $1)), AND(>($cor1.CONTACTNO, $0), <($cor1.EMAIL, $1)), AND(>($cor2.CONTACTNO, $0), <($cor2.EMAIL, $1)), AND(>($cor3.CONTACTNO, $0), <($cor3.EMAIL, $1)), AND(>($cor4.CONTACTNO, $0), <($cor4.EMAIL, $1)), AND(>($cor5.CONTACTNO, $0), <($cor5.EMAIL, $1)), AND(>($cor6.CONTACTNO, $0), <($cor6.EMAIL, $1)), AND(>($cor7.CONTACTNO, $0), <($cor7.EMAIL, $1)), AND(>($cor8.CONTACTNO, $0), <($cor8.EMAIL, $1)), AND(>($cor9.CONT [...]
EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
@@ -1245,8 +1245,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
<![CDATA[
EnumerableBatchNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner], batchSize=[100])
EnumerableLimit(fetch=[10])
- EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
- EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+ EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+ EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
EnumerableFilter(condition=[OR(AND(>($cor0.ENAME, $0), <($cor0.JOB, $1)), AND(>($cor1.ENAME, $0), <($cor1.JOB, $1)), AND(>($cor2.ENAME, $0), <($cor2.JOB, $1)), AND(>($cor3.ENAME, $0), <($cor3.JOB, $1)), AND(>($cor4.ENAME, $0), <($cor4.JOB, $1)), AND(>($cor5.ENAME, $0), <($cor5.JOB, $1)), AND(>($cor6.ENAME, $0), <($cor6.JOB, $1)), AND(>($cor7.ENAME, $0), <($cor7.JOB, $1)), AND(>($cor8.ENAME, $0), <($cor8.JOB, $1)), AND(>($cor9.ENAME, $0), <($cor9.JOB, $1)), AND(>($cor10.ENAME, $0), <($c [...]
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
@@ -1303,8 +1303,8 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])
</Resource>
<Resource name="planAfter">
<![CDATA[
-EnumerableCalc(expr#0..8=[{inputs}], MGR=[$t3])
- EnumerableSort(sort0=[$3], dir0=[DESC-nulls-last])
+EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
+ EnumerableCalc(expr#0..8=[{inputs}], MGR=[$t3])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
]]>
</Resource>
@@ -1359,8 +1359,8 @@ EnumerableSortedAggregate(group=[{0, 1, 2}])
EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[-2], expr#4=[*($t2, $t3)], ENAME=[$t0], EXPR$1=[$t4], MGR=[$t1])
EnumerableLimit(fetch=[100])
- EnumerableCalc(expr#0..8=[{inputs}], ENAME=[$t1], MGR=[$t3], SAL=[$t5])
- EnumerableSort(sort0=[$1], sort1=[$3], sort2=[$5], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+ EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+ EnumerableCalc(expr#0..8=[{inputs}], ENAME=[$t1], MGR=[$t3], SAL=[$t5])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
]]>
</Resource>
@@ -1393,8 +1393,8 @@ EnumerableCalc(expr#0..7=[{inputs}], proj#0..6=[{exprs}])
EnumerableLimit(fetch=[100])
EnumerableCalc(expr#0..8=[{inputs}], ENAME=[$t1], JOB=[$t2], SAL=[$t5])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
- EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):VARCHAR NOT NULL], proj#0..4=[{exprs}])
- EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+ EnumerableSort(sort0=[$4], sort1=[$0], dir0=[ASC], dir1=[ASC])
+ EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):VARCHAR NOT NULL], proj#0..4=[{exprs}])
EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
]]>
</Resource>
@@ -1451,8 +1451,8 @@ LogicalSort(sort0=[$1], dir0=[DESC])
</Resource>
<Resource name="planAfter">
<![CDATA[
-EnumerableCalc(expr#0..8=[{inputs}], expr#9=[CAST($t7):FLOAT NOT NULL], EXPR$0=[$t9], DEPTNO=[$t7])
- EnumerableSort(sort0=[$7], dir0=[DESC])
+EnumerableSort(sort0=[$1], dir0=[DESC])
+ EnumerableCalc(expr#0..8=[{inputs}], expr#9=[CAST($t7):FLOAT NOT NULL], EXPR$0=[$t9], DEPTNO=[$t7])
EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
]]>
</Resource>
diff --git a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
index d4781e1..bb9fa46 100644
--- a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
+++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.adapter.kafka;
+import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.test.CalciteAssert;
import com.google.common.io.Resources;
@@ -73,6 +74,7 @@ class KafkaAdapterTest {
@Test void testFilterWithProject() {
assertModel(MODEL)
+ .with(CalciteConnectionProperty.TOPDOWN_OPT.camelName(), false)
.query("SELECT STREAM MSG_PARTITION,MSG_OFFSET,MSG_VALUE_BYTES FROM KAFKA.MOCKTABLE"
+ " WHERE MSG_OFFSET>0")
.limit(1)