You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by hy...@apache.org on 2020/07/14 20:25:52 UTC

[calcite] branch master updated: [CALCITE-3916] Support top-down rule applying and upper bound space pruning

This is an automated email from the ASF dual-hosted git repository.

hyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 33aa61c  [CALCITE-3916] Support top-down rule applying and upper bound space pruning
33aa61c is described below

commit 33aa61ca404018cc9fe8ad2ec2c02ba269c67ebe
Author: jinpeng.wjp <ji...@alibaba-inc.com>
AuthorDate: Tue May 26 17:49:41 2020 +0800

    [CALCITE-3916] Support top-down rule applying and upper bound space pruning
    
    Close #1991
---
 .../calcite/interpreter/BindableConvention.java    |   5 +
 .../org/apache/calcite/plan/RelOptRuleOperand.java |   2 +-
 .../calcite/plan/volcano/IterativeRuleDriver.java  |  88 ++
 .../{RuleQueue.java => IterativeRuleQueue.java}    |  84 +-
 .../apache/calcite/plan/volcano/OptimizeTask.java  | 372 ---------
 .../org/apache/calcite/plan/volcano/RelSet.java    |  67 +-
 .../org/apache/calcite/plan/volcano/RelSubset.java |  87 +-
 .../apache/calcite/plan/volcano/RuleDriver.java    |  53 ++
 .../org/apache/calcite/plan/volcano/RuleQueue.java | 280 +------
 .../calcite/plan/volcano/TopDownRuleDriver.java    | 898 +++++++++++++++++++++
 .../calcite/plan/volcano/TopDownRuleQueue.java     |  88 ++
 .../calcite/plan/volcano/VolcanoPlanner.java       | 196 +++--
 .../calcite/rel/metadata/BuiltInMetadata.java      |  17 +-
 .../rel/metadata/DefaultRelMetadataProvider.java   |   1 +
 .../calcite/rel/metadata/RelMdLowerBoundCost.java  |  77 ++
 .../calcite/rel/metadata/RelMetadataQuery.java     |  18 +
 .../java/org/apache/calcite/tools/Programs.java    |   3 +-
 .../org/apache/calcite/util/BuiltInMethod.java     |   4 +
 .../calcite/plan/volcano/VolcanoPlannerTest.java   |  13 +-
 .../org/apache/calcite/test/JdbcAdapterTest.java   |   3 +
 .../org/apache/calcite/test/TopDownOptTest.xml     |  92 +--
 .../calcite/adapter/kafka/KafkaAdapterTest.java    |   2 +
 22 files changed, 1577 insertions(+), 873 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java b/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java
index bae22e9..31a4cba 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/BindableConvention.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
 
 /**
  * Calling convention that returns results as an
@@ -51,6 +52,10 @@ public enum BindableConvention implements Convention {
     return "BINDABLE";
   }
 
+  @Override public RelNode enforce(RelNode input, RelTraitSet required) {
+    return null;
+  }
+
   public RelTraitDef getTraitDef() {
     return ConventionTraitDef.INSTANCE;
   }
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java b/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java
index 7515140..913db6f 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptRuleOperand.java
@@ -47,7 +47,7 @@ public class RelOptRuleOperand {
   public int[] solveOrder;
   public int ordinalInParent;
   public int ordinalInRule;
-  private final RelTrait trait;
+  public final RelTrait trait;
   private final Class<? extends RelNode> clazz;
   private final ImmutableList<RelOptRuleOperand> children;
 
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleDriver.java b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleDriver.java
new file mode 100644
index 0000000..3ff8300
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleDriver.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.trace.CalciteTrace;
+
+import org.slf4j.Logger;
+
+/***
+ * <p>The algorithm executes repeatedly in a series of phases. In each phase
+ * the exact rules that may be fired varies. The mapping of phases to rule
+ * sets is maintained in the {@link #ruleQueue}.
+ *
+ * <p>In each phase, the planner then iterates over the rule matches presented
+ * by the rule queue until the rule queue becomes empty.
+ */
+class IterativeRuleDriver implements RuleDriver {
+
+  private static final Logger LOGGER = CalciteTrace.getPlannerTracer();
+
+  private final VolcanoPlanner planner;
+  private final IterativeRuleQueue ruleQueue;
+
+  IterativeRuleDriver(VolcanoPlanner planner) {
+    this.planner = planner;
+    ruleQueue = new IterativeRuleQueue(planner);
+  }
+
+  @Override public IterativeRuleQueue getRuleQueue() {
+    return ruleQueue;
+  }
+
+  @Override public void drive() {
+    PLANNING:
+    for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
+      while (true) {
+        LOGGER.debug("PLANNER = {}; PHASE = {}; COST = {}",
+            this, phase.toString(), planner.root.bestCost);
+
+        VolcanoRuleMatch match = ruleQueue.popMatch(phase);
+        if (match == null) {
+          break;
+        }
+
+        assert match.getRule().matches(match);
+        try {
+          match.onMatch();
+        } catch (VolcanoTimeoutException e) {
+          LOGGER.warn("Volcano planning times out, cancels the subsequent optimization.");
+          planner.canonize();
+          ruleQueue.phaseCompleted(phase);
+          break PLANNING;
+        }
+
+        // The root may have been merged with another
+        // subset. Find the new root subset.
+        planner.canonize();
+      }
+
+      ruleQueue.phaseCompleted(phase);
+    }
+  }
+
+  @Override public void onProduce(RelNode rel, RelSubset subset) {
+  }
+
+  @Override public void onSetMerged(RelSet set) {
+  }
+
+  @Override public void clear() {
+    ruleQueue.clear();
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleQueue.java
similarity index 78%
copy from core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
copy to core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleQueue.java
index 06b9587..531df45 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/IterativeRuleQueue.java
@@ -16,10 +16,7 @@
  */
 package org.apache.calcite.plan.volcano;
 
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.rules.SubstitutionRule;
-import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
 import com.google.common.collect.HashMultimap;
@@ -30,8 +27,6 @@ import org.slf4j.Logger;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayDeque;
-import java.util.Deque;
 import java.util.EnumMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -43,7 +38,7 @@ import java.util.Set;
  * Priority queue of relexps whose rules have not been called, and rule-matches
  * which have not yet been acted upon.
  */
-class RuleQueue {
+class IterativeRuleQueue extends RuleQueue {
   //~ Static fields/initializers ---------------------------------------------
 
   private static final Logger LOGGER = CalciteTrace.getPlannerTracer();
@@ -63,8 +58,6 @@ class RuleQueue {
   final Map<VolcanoPlannerPhase, PhaseMatchList> matchListMap =
       new EnumMap<>(VolcanoPlannerPhase.class);
 
-  private final VolcanoPlanner planner;
-
   /**
    * Maps a {@link VolcanoPlannerPhase} to a set of rule descriptions. Named rules
    * may be invoked in their corresponding phase.
@@ -76,8 +69,8 @@ class RuleQueue {
 
   //~ Constructors -----------------------------------------------------------
 
-  RuleQueue(VolcanoPlanner planner) {
-    this.planner = planner;
+  IterativeRuleQueue(VolcanoPlanner planner) {
+    super(planner);
 
     phaseRuleMapping = new EnumMap<>(VolcanoPlannerPhase.class);
 
@@ -106,10 +99,15 @@ class RuleQueue {
   /**
    * Clear internal data structure for this rule queue.
    */
-  public void clear() {
+  @Override public boolean clear() {
+    boolean empty = true;
     for (PhaseMatchList matchList : matchListMap.values()) {
+      if (!matchList.queue.isEmpty() || !matchList.preQueue.isEmpty()) {
+        empty = false;
+      }
       matchList.clear();
     }
+    return !empty;
   }
 
   /**
@@ -125,7 +123,7 @@ class RuleQueue {
    * existing {@link PhaseMatchList per-phase rule-match lists} which allow
    * the rule referenced by the match.
    */
-  void addMatch(VolcanoRuleMatch match) {
+  public void addMatch(VolcanoRuleMatch match) {
     final String matchName = match.toString();
     for (PhaseMatchList matchList : matchListMap.values()) {
       Set<String> phaseRuleSet = phaseRuleMapping.get(matchList.phase);
@@ -163,7 +161,7 @@ class RuleQueue {
    *                              previously marked as completed via
    *                              {@link #phaseCompleted(VolcanoPlannerPhase)}.
    */
-  VolcanoRuleMatch popMatch(VolcanoPlannerPhase phase) {
+  public VolcanoRuleMatch popMatch(VolcanoPlannerPhase phase) {
     dumpPlannerState();
 
     PhaseMatchList phaseMatchList = matchListMap.get(phase);
@@ -233,66 +231,6 @@ class RuleQueue {
     }
   }
 
-  /** Returns whether to skip a match. This happens if any of the
-   * {@link RelNode}s have importance zero. */
-  private boolean skipMatch(VolcanoRuleMatch match) {
-    for (RelNode rel : match.rels) {
-      if (planner.prunedNodes.contains(rel)) {
-        return true;
-      }
-    }
-
-    // If the same subset appears more than once along any path from root
-    // operand to a leaf operand, we have matched a cycle. A relational
-    // expression that consumes its own output can never be implemented, and
-    // furthermore, if we fire rules on it we may generate lots of garbage.
-    // For example, if
-    //   Project(A, X = X + 0)
-    // is in the same subset as A, then we would generate
-    //   Project(A, X = X + 0 + 0)
-    //   Project(A, X = X + 0 + 0 + 0)
-    // also in the same subset. They are valid but useless.
-    final Deque<RelSubset> subsets = new ArrayDeque<>();
-    try {
-      checkDuplicateSubsets(subsets, match.rule.getOperand(), match.rels);
-    } catch (Util.FoundOne e) {
-      return true;
-    }
-    return false;
-  }
-
-  /** Recursively checks whether there are any duplicate subsets along any path
-   * from root of the operand tree to one of the leaves.
-   *
-   * <p>It is OK for a match to have duplicate subsets if they are not on the
-   * same path. For example,
-   *
-   * <blockquote><pre>
-   *   Join
-   *  /   \
-   * X     X
-   * </pre></blockquote>
-   *
-   * <p>is a valid match.
-   *
-   * @throws org.apache.calcite.util.Util.FoundOne on match
-   */
-  private void checkDuplicateSubsets(Deque<RelSubset> subsets,
-      RelOptRuleOperand operand, RelNode[] rels) {
-    final RelSubset subset = planner.getSubset(rels[operand.ordinalInRule]);
-    if (subsets.contains(subset)) {
-      throw Util.FoundOne.NULL;
-    }
-    if (!operand.getChildOperands().isEmpty()) {
-      subsets.push(subset);
-      for (RelOptRuleOperand childOperand : operand.getChildOperands()) {
-        checkDuplicateSubsets(subsets, childOperand, rels);
-      }
-      final RelSubset x = subsets.pop();
-      assert x == subset;
-    }
-  }
-
   //~ Inner Classes ----------------------------------------------------------
 
   /**
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java b/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java
deleted file mode 100644
index c72e2b8..0000000
--- a/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.plan.volcano;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.DeriveMode;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.util.trace.CalciteTrace;
-
-import org.apiguardian.api.API;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * <code>OptimizeTask</code> represents the optimization task
- * of VolcanoPlanner.
- *
- * <p>How does it work?</p>
- *
- * <p>Let S# denote the seed physical operator in a RelSet after
- * logical and physical rules transformation, P# denote the
- * physical operator generated by passing down parent trait
- * requirements, D# denote the physical operator generated by
- * deriving from child delivered traitSets.</p>
- *
- * The initial rel list state in a RelSet is as follows:
- * <pre>
- *  cursor
- *    |
- *    V
- *   S1, S2
- * </pre>
- *
- * When we create a task for RelSubset1, the task will immediately
- * pass the subset's traitSet to seed operators, S1 and S2,
- * now we have:
- * <pre>
- *  cursor
- *    |
- *    V
- *   S1, S2, P1, P2
- * </pre>
- *
- * The subset task will create a optimization task for the relnode
- * pointed by cursor, and move cursor to next available physical
- * operator S2. In the task for S1, it will continue optimize its
- * child nodes, which are RelSubsets. After child inputs optimization
- * is finished, S1 will derive new relnodes from delivered subsets
- * in input RelSet. Once task for S1 is completed, we have:
- * <pre>
- *      cursor
- *        |
- *        V
- *   S1, S2, P1, P2, D1
- * </pre>
- *
- * The subset task continues scheduling task for S2, P1... until
- * there is no more relnode created for the RelSet, then we have:
- * <pre>
- *                              cursor
- *                                |
- *                                V
- *   S1, S2, P1, P2, D1, D2, D3, null
- * </pre>
- *
- * When a task for another RelSubset2 is created, the task will try
- * to pass down the subset's traitSet to seed operator S1 and S2,
- * now the RelSet looks like:
- * <pre>
- *                              cursor
- *                                |
- *                                V
- *   S1, S2, P1, P2, D1, D2, D3, P3, P4
- * </pre>
- *
- * The process continues till there is no more subsets or relnodes
- * created for the RelSet.
- */
-@API(since = "1.23", status = API.Status.INTERNAL)
-abstract class OptimizeTask {
-
-  static final Logger LOGGER = CalciteTrace.getPlannerTaskTracer();
-
-  static OptimizeTask create(RelNode node) {
-    if (node instanceof RelSubset) {
-      return new RelSubsetOptTask((RelSubset) node);
-    }
-    return new RelNodeOptTask(node);
-  }
-
-  final VolcanoPlanner planner;
-  final int id;
-
-  OptimizeTask(RelNode node) {
-    planner = (VolcanoPlanner) node.getCluster().getPlanner();
-    id = planner.nextTaskId++;
-    LOGGER.debug("Scheduled task(id={}) for {}", id, node);
-  }
-
-  abstract boolean hasSubTask();
-
-  abstract OptimizeTask nextSubTask();
-
-  abstract void execute();
-
-  /**
-   * Task State
-   */
-  public enum State {
-    SCHEDULED,
-    EXECUTING,
-    COMPLETED
-  }
-
-  /**
-   * Task for optimizing RelNode.
-   *
-   * <p>Optimize input RelSubsets and derive new RelNodes
-   * from child traitSets.</p>
-   */
-  static class RelNodeOptTask extends OptimizeTask {
-    final RelNode node;
-    int nextId = 0; // next child index
-
-    RelNodeOptTask(RelNode node) {
-      super(node);
-      this.node = node;
-    }
-
-    @Override boolean hasSubTask() {
-      int size = node.getInputs().size();
-      while (nextId < size) {
-        RelSubset subset = (RelSubset) node.getInput(nextId);
-        if (subset.taskState == null) {
-          // not yet scheduled
-          return true;
-        } else {
-          // maybe a cycle if it is not completed
-          nextId++;
-        }
-      }
-
-      return false;
-    }
-
-    @Override OptimizeTask nextSubTask() {
-      RelNode child = node.getInput(nextId++);
-      return new RelSubsetOptTask((RelSubset) child);
-    }
-
-    @Override void execute() {
-      if (!(node instanceof PhysicalNode)
-          || ((PhysicalNode) node).getDeriveMode() == DeriveMode.PROHIBITED
-          || !planner.isSeedNode(node)) {
-        LOGGER.debug("Completed task(id={}) for {}", id, node);
-        return;
-      }
-
-      PhysicalNode rel = (PhysicalNode) node;
-      DeriveMode mode = rel.getDeriveMode();
-      int arity = node.getInputs().size();
-      // for OMAKASE
-      List<List<RelTraitSet>> inputTraits = new ArrayList<>(arity);
-
-      for (int i = 0; i < arity; i++) {
-        int childId = i;
-        if (mode == DeriveMode.RIGHT_FIRST) {
-          childId = arity - i - 1;
-        }
-
-        RelSubset input = (RelSubset) node.getInput(childId);
-        List<RelTraitSet> traits = new ArrayList<>();
-        inputTraits.add(traits);
-
-        final int numSubset = input.set.subsets.size();
-        for (int j = 0; j < numSubset; j++) {
-          RelSubset subset = input.set.subsets.get(j);
-          if (!subset.isDelivered() || equalsSansConvention(
-              subset.getTraitSet(), rel.getCluster().traitSet())) {
-            // Ideally we should stop deriving new relnodes when the
-            // subset's traitSet equals with input traitSet, but
-            // in case someone manually builds a physical relnode
-            // tree, which is highly discouraged, without specifying
-            // correct traitSet, e.g.
-            //   EnumerableFilter  [].ANY
-            //       -> EnumerableMergeJoin  [a].Hash[a]
-            // We should still be able to derive the correct traitSet
-            // for the dumb filter, even though the filter's traitSet
-            // should be derived from the MergeJoin when it is created.
-            // But if the subset's traitSet equals with the default
-            // empty traitSet sans convention (the default traitSet
-            // from cluster may have logical convention, NONE, which
-            // is not interesting), we are safe to ignore it, because
-            // a physical filter with non default traitSet, but has a
-            // input with default empty traitSet, e.g.
-            //   EnumerableFilter  [a].Hash[a]
-            //       -> EnumerableProject  [].ANY
-            // is definitely wrong, we should fail fast.
-            continue;
-          }
-
-          if (mode == DeriveMode.OMAKASE) {
-            traits.add(subset.getTraitSet());
-          } else {
-            RelNode newRel = rel.derive(subset.getTraitSet(), childId);
-            if (newRel != null && !planner.isRegistered(newRel)) {
-              RelNode newInput = newRel.getInput(childId);
-              assert newInput instanceof RelSubset;
-              if (newInput == subset) {
-                // If the child subset is used to derive new traits for
-                // current relnode, the subset will be marked REQUIRED
-                // when registering the new derived relnode and later
-                // will add enforcers between other delivered subsets.
-                // e.g. a MergeJoin request both inputs hash distributed
-                // by [a,b] sorted by [a,b]. If the left input R1 happens to
-                // be distributed by [a], the MergeJoin can derive new
-                // traits from this input and request both input to be
-                // distributed by [a] sorted by [a,b]. In case there is a
-                // alternative R2 with ANY distribution in the left input's
-                // RelSet, we may end up with requesting hash distribution
-                // [a] on alternative R2, which is unnecessary and waste,
-                // because we request distribution by [a] because of R1 can
-                // deliver the exact same distribution and we don't need to
-                // enforce properties on other subsets that can't satisfy
-                // the specific trait requirement.
-                // Here we add a constraint that {@code newInput == subset},
-                // because if the delivered child subset is HASH[a], but
-                // we require HASH[a].SORT[a,b], we still need to enable
-                // property enforcement on the required subset. Otherwise,
-                // we need to restrict enforcement between HASH[a].SORT[a,b]
-                // and HASH[a] only, which will make things a little bit
-                // complicated. We might optimize it in the future.
-                subset.disableEnforcing();
-              }
-              RelSubset relSubset = planner.register(newRel, node);
-              assert relSubset.set == planner.getSubset(node).set;
-            }
-          }
-        }
-
-        if (mode == DeriveMode.LEFT_FIRST
-            || mode == DeriveMode.RIGHT_FIRST) {
-          break;
-        }
-      }
-
-      if (mode == DeriveMode.OMAKASE) {
-        List<RelNode> relList = rel.derive(inputTraits);
-        for (RelNode relNode : relList) {
-          if (!planner.isRegistered(relNode)) {
-            planner.register(relNode, node);
-          }
-        }
-      }
-
-      LOGGER.debug("Completed task(id={}) for {}", id, node);
-    }
-
-    /**
-     * Returns whether the 2 traitSets are equal without Convention.
-     * It assumes they have the same traitDefs order.
-     */
-    private boolean equalsSansConvention(RelTraitSet ts1, RelTraitSet ts2) {
-      assert ts1.size() == ts2.size();
-      for (int i = 0; i < ts1.size(); i++) {
-        RelTrait trait = ts1.getTrait(i);
-        if (trait.getTraitDef() == ConventionTraitDef.INSTANCE) {
-          continue;
-        }
-        if (!trait.equals(ts2.getTrait(i))) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    @Override public String toString() {
-      return "Task#" + id + ":{ " + node + " }";
-    }
-  }
-
-  /**
-   * Task for optimizing RelSubset.
-   *
-   * <p>Pass down the trait requirements of current RelSubset
-   * and add enforcers to the new delivered subsets.</p>
-   */
-  static class RelSubsetOptTask extends OptimizeTask {
-    final RelSubset subset;
-    final Set<RelSubset> deliveredSubsets = new HashSet<>();
-
-    RelSubsetOptTask(RelSubset subset) {
-      super(subset);
-      this.subset = subset;
-      subset.taskState = State.SCHEDULED;
-      propagateTraits();
-    }
-
-    private void propagateTraits() {
-      int size = Math.min(subset.set.getSeedSize(),
-          subset.set.rels.size());
-
-      for (int i = 0; i < size; i++) {
-        RelNode rel = subset.set.rels.get(i);
-        if (!(rel instanceof PhysicalNode)
-            || rel.getConvention() == Convention.NONE
-            || rel.getTraitSet().satisfies(subset.getTraitSet())) {
-          continue;
-        }
-
-        RelNode node = ((PhysicalNode) rel).passThrough(
-            subset.getTraitSet());
-        if (node != null && !planner.isRegistered(node)) {
-          RelSubset newSubset = planner.register(node, subset);
-          deliveredSubsets.add(newSubset);
-        } else {
-          // TODO: should we consider stop trying propagation on node
-          // with the same traitset as phyNode?
-          assert true;
-        }
-      }
-    }
-
-    @Override boolean hasSubTask() {
-      return subset.set.hasNextPhysicalNode();
-    }
-
-    @Override OptimizeTask nextSubTask() {
-      RelNode rel = subset.set.nextPhysicalNode();
-      return new RelNodeOptTask(rel);
-    }
-
-    @Override void execute() {
-      subset.taskState = State.EXECUTING;
-
-      subset.set.addConverters(subset, true, false);
-
-      for (RelSubset delivered : deliveredSubsets) {
-        subset.set.addConverters(delivered, false, false);
-      }
-
-      subset.taskState = State.COMPLETED;
-      LOGGER.debug("Completed task(id={}) for {}", id, subset);
-    }
-
-    @Override public String toString() {
-      return "Task#" + id + ":{ " + subset + " }";
-    }
-  }
-}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
index 5d20104..bf4fe5d 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
@@ -23,7 +23,6 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.Converter;
 import org.apache.calcite.rel.core.CorrelationId;
@@ -76,15 +75,9 @@ class RelSet {
   RelNode rel;
 
   /**
-   * The position indicator of rel node that is to be processed.
+   * Exploring state of current RelSet.
    */
-  private int relCursor = 0;
-
-  /**
-   * The relnodes after applying logical rules and physical rules,
-   * before trait propagation and enforcement.
-   */
-  final Set<RelNode> seeds = new HashSet<>();
+  ExploringState exploringState;
 
   /**
    * Records conversions / enforcements that have happened on the
@@ -166,32 +159,6 @@ class RelSet {
     return null;
   }
 
-  public int getSeedSize() {
-    if (seeds.isEmpty()) {
-      seeds.addAll(rels);
-    }
-    return seeds.size();
-  }
-
-  public boolean hasNextPhysicalNode() {
-    while (relCursor < rels.size()) {
-      RelNode node = rels.get(relCursor);
-      if (node instanceof PhysicalNode
-          && node.getConvention() != Convention.NONE) {
-        // enforcer may be manually created for some reason
-        if (relCursor < getSeedSize() || !node.isEnforcer()) {
-          return true;
-        }
-      }
-      relCursor++;
-    }
-    return false;
-  }
-
-  public RelNode nextPhysicalNode() {
-    return rels.get(relCursor++);
-  }
-
   /**
    * Removes all references to a specific {@link RelNode} in both the subsets
    * and their parent relationships.
@@ -315,8 +282,8 @@ class RelSet {
       subset.setDelivered();
     }
 
-    if (needsConverter && !planner.topDownOpt) {
-      addConverters(subset, required, true);
+    if (needsConverter) {
+      addConverters(subset, required, !planner.topDownOpt);
     }
 
     return subset;
@@ -410,6 +377,13 @@ class RelSet {
         subset = getOrCreateSubset(cluster, otherTraits, true);
       }
 
+      assert subset != null;
+      if (subset.passThroughCache == null) {
+        subset.passThroughCache = otherSubset.passThroughCache;
+      } else if (otherSubset.passThroughCache != null) {
+        subset.passThroughCache.addAll(otherSubset.passThroughCache);
+      }
+
       // collect RelSubset instances, whose best should be changed
       if (otherSubset.bestCost.isLt(subset.bestCost)) {
         changedSubsets.put(subset, otherSubset.best);
@@ -488,4 +462,23 @@ class RelSet {
       planner.fireRules(subset);
     }
   }
+
+  //~ Inner Classes ----------------------------------------------------------
+
+  /**
+   * An enum representing exploring state of current RelSet.
+   */
+  enum ExploringState {
+    /**
+     * The RelSet is exploring.
+     * It means all possible rule matches are scheduled, but not fully applied.
+     * This RelSet will refuse to explore again, but cannot provide a valid LB.
+     */
+    EXPLORING,
+
+    /**
+     * The RelSet is fully explored and is able to provide a valid LB.
+     */
+    EXPLORED
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java b/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
index afecdce..ad5bda8 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
@@ -37,6 +38,8 @@ import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
+import com.google.common.collect.Sets;
+
 import org.apiguardian.api.API;
 import org.slf4j.Logger;
 
@@ -84,7 +87,7 @@ public class RelSubset extends AbstractRelNode {
   /**
    * Optimization task state
    */
-  OptimizeTask.State taskState;
+  OptimizeState taskState;
 
   /**
    * cost of best known plan (it may have improved since)
@@ -128,6 +131,17 @@ public class RelSubset extends AbstractRelNode {
    */
   private boolean enforceDisabled = false;
 
+  /**
+   * The upper bound of the last OptimizeGroup call.
+   */
+  RelOptCost upperBound;
+
+  /**
+   * A cache that recognize which RelNode has invoked the passThrough method
+   * so as to avoid duplicate invocation.
+   */
+  Set<RelNode> passThroughCache;
+
   //~ Constructors -----------------------------------------------------------
 
   RelSubset(
@@ -138,6 +152,7 @@ public class RelSubset extends AbstractRelNode {
     this.set = set;
     assert traits.allSimple();
     computeBestCost(cluster.getPlanner());
+    upperBound = bestCost;
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -414,6 +429,7 @@ public class RelSubset extends AbstractRelNode {
 
         bestCost = cost;
         best = rel;
+        upperBound = bestCost;
         // since best was changed, cached metadata for this subset should be removed
         mq.clearCache(this);
 
@@ -489,6 +505,70 @@ public class RelSubset extends AbstractRelNode {
       .filter(s -> traitSet.satisfies(s.getTraitSet()));
   }
 
+  /**
+   * Returns the best cost if this subset is fully optimized
+   * or null if the subset is not fully optimized.
+   */
+  @API(since = "1.24", status = API.Status.INTERNAL)
+  public RelOptCost getWinnerCost() {
+    if (taskState == OptimizeState.COMPLETED && bestCost.isLe(upperBound)) {
+      return bestCost;
+    }
+    // if bestCost != upperBound, it means optimize failed
+    return null;
+  }
+
+  void startOptimize(RelOptCost ub) {
+    assert getWinnerCost() == null : this + " is already optimized";
+    if (upperBound.isLt(ub)) {
+      upperBound = ub;
+      if (bestCost.isLt(upperBound)) {
+        upperBound = bestCost;
+      }
+    }
+    taskState = OptimizeState.OPTIMIZING;
+  }
+
+  void setOptimized() {
+    taskState = OptimizeState.COMPLETED;
+  }
+
+  boolean resetTaskState() {
+    boolean optimized = taskState != null;
+    taskState = null;
+    upperBound = bestCost;
+    return optimized;
+  }
+
+  RelNode passThrough(RelNode rel) {
+    if (!(rel instanceof PhysicalNode)) {
+      return null;
+    }
+    if (passThroughCache == null) {
+      passThroughCache = Sets.newIdentityHashSet();
+      passThroughCache.add(rel);
+    } else if (!passThroughCache.add(rel)) {
+      return null;
+    }
+    return ((PhysicalNode) rel).passThrough(this.getTraitSet());
+  }
+
+  boolean isExplored() {
+    return set.exploringState == RelSet.ExploringState.EXPLORED;
+  }
+
+  boolean explore() {
+    if (set.exploringState != null) {
+      return false;
+    }
+    set.exploringState = RelSet.ExploringState.EXPLORING;
+    return true;
+  }
+
+  void setExplored() {
+    set.exploringState = RelSet.ExploringState.EXPLORED;
+  }
+
   //~ Inner Classes ----------------------------------------------------------
 
   /**
@@ -700,4 +780,9 @@ public class RelSubset extends AbstractRelNode {
       return p;
     }
   }
+
+  enum OptimizeState {
+    OPTIMIZING,
+    COMPLETED
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RuleDriver.java b/core/src/main/java/org/apache/calcite/plan/volcano/RuleDriver.java
new file mode 100644
index 0000000..f32279a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RuleDriver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * A rule driver applies rules with designed algorithms.
+ */
+interface RuleDriver {
+
+  /**
+   * Gets the rule queue.
+   */
+  RuleQueue getRuleQueue();
+
+  /**
+   * Apply rules.
+   */
+  void drive();
+
+  /**
+   * Callback when new RelNodes are added into RelSet.
+   * @param rel the new RelNode
+   * @param subset subset to add
+   */
+  void onProduce(RelNode rel, RelSubset subset);
+
+  /**
+   * Callback when RelSets are merged.
+   * @param set the merged result set
+   */
+  void onSetMerged(RelSet set);
+
+  /**
+   * Clear this RuleDriver.
+   */
+  void clear();
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java b/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
index 06b9587..ac683cb 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RuleQueue.java
@@ -18,224 +18,41 @@ package org.apache.calcite.plan.volcano;
 
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.rules.SubstitutionRule;
 import org.apache.calcite.util.Util;
-import org.apache.calcite.util.trace.CalciteTrace;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-
-import org.slf4j.Logger;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.EnumMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
 
 /**
- * Priority queue of relexps whose rules have not been called, and rule-matches
- * which have not yet been acted upon.
+ * A data structure that manages rule matches for RuleDriver.
+ * Different RuleDriver requires different ways to pop matches,
+ * thus different ways to store rule matches that are not called.
  */
-class RuleQueue {
-  //~ Static fields/initializers ---------------------------------------------
-
-  private static final Logger LOGGER = CalciteTrace.getPlannerTracer();
-
-  private static final Set<String> ALL_RULES = ImmutableSet.of("<ALL RULES>");
-
-  //~ Instance fields --------------------------------------------------------
-
-  /**
-   * Map of {@link VolcanoPlannerPhase} to a list of rule-matches. Initially,
-   * there is an empty {@link PhaseMatchList} for each planner phase. As the
-   * planner invokes {@link #addMatch(VolcanoRuleMatch)} the rule-match is
-   * added to the appropriate PhaseMatchList(s). As the planner completes
-   * phases, the matching entry is removed from this list to avoid unused
-   * work.
-   */
-  final Map<VolcanoPlannerPhase, PhaseMatchList> matchListMap =
-      new EnumMap<>(VolcanoPlannerPhase.class);
-
-  private final VolcanoPlanner planner;
-
-  /**
-   * Maps a {@link VolcanoPlannerPhase} to a set of rule descriptions. Named rules
-   * may be invoked in their corresponding phase.
-   *
-   * <p>See {@link VolcanoPlannerPhaseRuleMappingInitializer} for more
-   * information regarding the contents of this Map and how it is initialized.
-   */
-  private final Map<VolcanoPlannerPhase, Set<String>> phaseRuleMapping;
+public abstract class RuleQueue {
 
-  //~ Constructors -----------------------------------------------------------
+  protected final VolcanoPlanner planner;
 
-  RuleQueue(VolcanoPlanner planner) {
+  protected RuleQueue(VolcanoPlanner planner) {
     this.planner = planner;
-
-    phaseRuleMapping = new EnumMap<>(VolcanoPlannerPhase.class);
-
-    // init empty sets for all phases
-    for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
-      phaseRuleMapping.put(phase, new HashSet<>());
-    }
-
-    // configure phases
-    planner.getPhaseRuleMappingInitializer().initialize(phaseRuleMapping);
-
-    for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
-      // empty phases get converted to "all rules"
-      if (phaseRuleMapping.get(phase).isEmpty()) {
-        phaseRuleMapping.put(phase, ALL_RULES);
-      }
-
-      // create a match list data structure for each phase
-      PhaseMatchList matchList = new PhaseMatchList(phase);
-
-      matchListMap.put(phase, matchList);
-    }
   }
 
-  //~ Methods ----------------------------------------------------------------
   /**
-   * Clear internal data structure for this rule queue.
-   */
-  public void clear() {
-    for (PhaseMatchList matchList : matchListMap.values()) {
-      matchList.clear();
-    }
-  }
-
-  /**
-   * Removes the {@link PhaseMatchList rule-match list} for the given planner
-   * phase.
-   */
-  public void phaseCompleted(VolcanoPlannerPhase phase) {
-    matchListMap.get(phase).clear();
-  }
-
-  /**
-   * Adds a rule match. The rule-matches are automatically added to all
-   * existing {@link PhaseMatchList per-phase rule-match lists} which allow
-   * the rule referenced by the match.
-   */
-  void addMatch(VolcanoRuleMatch match) {
-    final String matchName = match.toString();
-    for (PhaseMatchList matchList : matchListMap.values()) {
-      Set<String> phaseRuleSet = phaseRuleMapping.get(matchList.phase);
-      if (phaseRuleSet != ALL_RULES) {
-        String ruleDescription = match.getRule().toString();
-        if (!phaseRuleSet.contains(ruleDescription)) {
-          continue;
-        }
-      }
-
-      if (!matchList.names.add(matchName)) {
-        // Identical match has already been added.
-        continue;
-      }
-
-      LOGGER.trace("{} Rule-match queued: {}", matchList.phase.toString(), matchName);
-
-      matchList.offer(match);
-
-      matchList.matchMap.put(
-          planner.getSubset(match.rels[0]), match);
-    }
-  }
-
-  /**
-   * Removes the rule match from the head of match list, and returns it.
-   *
-   * <p>Returns {@code null} if there are no more matches.</p>
-   *
-   * <p>Note that the VolcanoPlanner may still decide to reject rule matches
-   * which have become invalid, say if one of their operands belongs to an
-   * obsolete set or has importance=0.
-   *
-   * @throws java.lang.AssertionError if this method is called with a phase
-   *                              previously marked as completed via
-   *                              {@link #phaseCompleted(VolcanoPlannerPhase)}.
+   * Add a RuleMatch into the queue.
+   * @param match rule match to add
    */
-  VolcanoRuleMatch popMatch(VolcanoPlannerPhase phase) {
-    dumpPlannerState();
-
-    PhaseMatchList phaseMatchList = matchListMap.get(phase);
-    if (phaseMatchList == null) {
-      throw new AssertionError("Used match list for phase " + phase
-          + " after phase complete");
-    }
-
-    VolcanoRuleMatch match;
-    for (;;) {
-      if (phaseMatchList.size() == 0) {
-        return null;
-      }
-
-      dumpRuleQueue(phaseMatchList);
-
-      match = phaseMatchList.poll();
-
-      if (skipMatch(match)) {
-        LOGGER.debug("Skip match: {}", match);
-      } else {
-        break;
-      }
-    }
-
-    // If sets have merged since the rule match was enqueued, the match
-    // may not be removed from the matchMap because the subset may have
-    // changed, it is OK to leave it since the matchMap will be cleared
-    // at the end.
-    phaseMatchList.matchMap.remove(
-        planner.getSubset(match.rels[0]), match);
-
-    LOGGER.debug("Pop match: {}", match);
-    return match;
-  }
+  public abstract void addMatch(VolcanoRuleMatch match);
 
   /**
-   * Dumps rules queue to the logger when debug level is set to {@code TRACE}.
+   * clear this rule queue.
+   * The return value indicates whether the rule queue was empty before clear.
+   * @return true if the rule queue was not empty
    */
-  private void dumpRuleQueue(PhaseMatchList phaseMatchList) {
-    if (LOGGER.isTraceEnabled()) {
-      StringBuilder b = new StringBuilder();
-      b.append("Rule queue:");
-      for (VolcanoRuleMatch rule : phaseMatchList.preQueue) {
-        b.append("\n");
-        b.append(rule);
-      }
-      for (VolcanoRuleMatch rule : phaseMatchList.queue) {
-        b.append("\n");
-        b.append(rule);
-      }
-      LOGGER.trace(b.toString());
-    }
-  }
+  public abstract boolean clear();
 
-  /**
-   * Dumps planner's state to the logger when debug level is set to {@code TRACE}.
-   */
-  private void dumpPlannerState() {
-    if (LOGGER.isTraceEnabled()) {
-      StringWriter sw = new StringWriter();
-      PrintWriter pw = new PrintWriter(sw);
-      planner.dump(pw);
-      pw.flush();
-      LOGGER.trace(sw.toString());
-      planner.getRoot().getCluster().invalidateMetadataQuery();
-    }
-  }
 
   /** Returns whether to skip a match. This happens if any of the
    * {@link RelNode}s have importance zero. */
-  private boolean skipMatch(VolcanoRuleMatch match) {
+  protected boolean skipMatch(VolcanoRuleMatch match) {
     for (RelNode rel : match.rels) {
       if (planner.prunedNodes.contains(rel)) {
         return true;
@@ -292,73 +109,4 @@ class RuleQueue {
       assert x == subset;
     }
   }
-
-  //~ Inner Classes ----------------------------------------------------------
-
-  /**
-   * PhaseMatchList represents a set of {@link VolcanoRuleMatch rule-matches}
-   * for a particular
-   * {@link VolcanoPlannerPhase phase of the planner's execution}.
-   */
-  private static class PhaseMatchList {
-    /**
-     * The VolcanoPlannerPhase that this PhaseMatchList is used in.
-     */
-    final VolcanoPlannerPhase phase;
-
-    /**
-     * Rule match queue for SubstitutionRule
-     */
-    private final Queue<VolcanoRuleMatch> preQueue = new LinkedList<>();
-
-    /**
-     * Current list of VolcanoRuleMatches for this phase. New rule-matches
-     * are appended to the end of this queue.
-     * The rules are not sorted in any way.
-     */
-    private final Queue<VolcanoRuleMatch> queue = new LinkedList<>();
-
-    /**
-     * A set of rule-match names contained in {@link #queue}. Allows fast
-     * detection of duplicate rule-matches.
-     */
-    final Set<String> names = new HashSet<>();
-
-    /**
-     * Multi-map of RelSubset to VolcanoRuleMatches.
-     */
-    final Multimap<RelSubset, VolcanoRuleMatch> matchMap =
-        HashMultimap.create();
-
-    PhaseMatchList(VolcanoPlannerPhase phase) {
-      this.phase = phase;
-    }
-
-    int size() {
-      return preQueue.size() + queue.size();
-    }
-
-    VolcanoRuleMatch poll() {
-      VolcanoRuleMatch match = preQueue.poll();
-      if (match == null) {
-        match = queue.poll();
-      }
-      return match;
-    }
-
-    void offer(VolcanoRuleMatch match) {
-      if (match.getRule() instanceof SubstitutionRule) {
-        preQueue.offer(match);
-      } else {
-        queue.offer(match);
-      }
-    }
-
-    void clear() {
-      preQueue.clear();
-      queue.clear();
-      names.clear();
-      matchMap.clear();
-    }
-  }
 }
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java
new file mode 100644
index 0000000..0443159
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java
@@ -0,0 +1,898 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.DeriveMode;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.trace.CalciteTrace;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+import java.util.function.Predicate;
+
+class TopDownRuleDriver implements RuleDriver {
+
+  private static final Logger LOGGER = CalciteTrace.getPlannerTaskTracer();
+
+  private final VolcanoPlanner planner;
+
+  /**
+   * The rule queue designed for top-down rule applying.
+   */
+  private final TopDownRuleQueue ruleQueue;
+
+  /**
+   * All tasks waiting for execution.
+   */
+  private Stack<Task> tasks = new Stack<>();
+
+  /**
+   * A task that is currently applying and may generate new RelNode.
+   * It provides a callback to schedule tasks for new RelNodes that
+   * are registered during task performing.
+   */
+  private GeneratorTask applying = null;
+
+  /**
+   * RelNodes that are generated by passThrough or derive
+   * these nodes will not takes part in another passThrough or derive.
+   */
+  private Set<RelNode> passThroughCache = new HashSet<>();
+
+  //~ Constructors -----------------------------------------------------------
+
+  TopDownRuleDriver(VolcanoPlanner planner) {
+    this.planner = planner;
+    ruleQueue = new TopDownRuleQueue(planner);
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @Override public void drive() {
+    TaskDescriptor description = new TaskDescriptor();
+    tasks.push(new OptimizeGroup(planner.root, planner.infCost));
+    exploreMaterializationRoots();
+    while (!tasks.isEmpty()) {
+      Task task = tasks.pop();
+      description.log(task);
+      task.perform();
+    }
+  }
+
+  private void exploreMaterializationRoots() {
+    for (RelSubset extraRoot : planner.explorationRoots) {
+      RelSet rootSet = VolcanoPlanner.equivRoot(extraRoot.set);
+      if (rootSet == planner.root.set) {
+        continue;
+      }
+      for (RelNode rel : extraRoot.set.rels) {
+        if (planner.isLogical(rel)) {
+          tasks.push(new OptimizeMExpr(rel, extraRoot, true));
+        }
+      }
+    }
+  }
+
+  @Override public TopDownRuleQueue getRuleQueue() {
+    return ruleQueue;
+  }
+
+  @Override public void clear() {
+    ruleQueue.clear();
+    tasks.clear();
+    passThroughCache.clear();
+    applying = null;
+  }
+
+  private interface Procedure {
+    void exec();
+  }
+
+  private void applyGenerator(GeneratorTask task, Procedure proc) {
+    GeneratorTask applying = this.applying;
+    this.applying = task;
+    try {
+      proc.exec();
+    } finally {
+      this.applying = applying;
+    }
+  }
+
+  public void onSetMerged(RelSet set) {
+    applyGenerator(null, () -> clearProcessed(set));
+  }
+
+  private void clearProcessed(RelSet set) {
+    boolean explored = set.exploringState != null;
+    set.exploringState = null;
+
+    for (RelSubset subset : set.subsets) {
+      if (subset.resetTaskState() || explored) {
+        Collection<RelNode> parentRels = subset.getParentRels();
+        for (RelNode parentRel : parentRels) {
+          clearProcessed(planner.getSet(parentRel));
+        }
+        if (subset == planner.root) {
+          tasks.push(new OptimizeGroup(subset, planner.infCost));
+        }
+      }
+    }
+  }
+
+  public void onProduce(RelNode node, RelSubset subset) {
+    if (applying == null || subset.set
+        != VolcanoPlanner.equivRoot(applying.group().set)) {
+      return;
+    }
+
+    if (!applying.onProduce(node)) {
+      return;
+    }
+
+    if (!planner.isLogical(node)) {
+      RelSubset optimizingGroup = null;
+      boolean canPassThrough = node instanceof PhysicalNode
+          && !passThroughCache.contains(node);
+      if (!canPassThrough && subset.taskState != null) {
+        optimizingGroup = subset;
+      } else {
+        RelOptCost upperBound = planner.zeroCost;
+        RelSet set = subset.getSet();
+        List<RelSubset> subsetsToPassThrough = new ArrayList<>();
+        for (RelSubset otherSubset : set.subsets) {
+          if (!otherSubset.isRequired() || otherSubset != planner.root
+              && otherSubset.taskState != RelSubset.OptimizeState.OPTIMIZING) {
+            continue;
+          }
+          if (node.getTraitSet().satisfies(otherSubset.getTraitSet())) {
+            if (upperBound.isLt(otherSubset.upperBound)) {
+              upperBound = otherSubset.upperBound;
+              optimizingGroup = otherSubset;
+            }
+          } else if (canPassThrough) {
+            subsetsToPassThrough.add(otherSubset);
+          }
+        }
+        for (RelSubset otherSubset : subsetsToPassThrough) {
+          Task task = getOptimizeInputTask(node, otherSubset);
+          if (task != null) {
+            tasks.push(task);
+          }
+        }
+      }
+      if (optimizingGroup == null) {
+        return;
+      }
+      Task task = getOptimizeInputTask(node, optimizingGroup);
+      if (task != null) {
+        tasks.push(task);
+      }
+    } else {
+      boolean optimizing = subset.set.subsets.stream()
+          .anyMatch(s -> s.taskState == RelSubset.OptimizeState.OPTIMIZING);
+      tasks.push(
+          new OptimizeMExpr(node, applying.group(),
+              applying.exploring() && !optimizing));
+    }
+  }
+
+  //~ Inner Classes ----------------------------------------------------------
+
+  /**
+   * Base class for planner task.
+   */
+  private interface Task {
+    void perform();
+    void describe(TaskDescriptor desc);
+  }
+
+  /**
+   * A class for task logging.
+   */
+  private static class TaskDescriptor {
+    private boolean first = true;
+    private StringBuilder builder = new StringBuilder();
+
+    void log(Task task) {
+      if (!LOGGER.isDebugEnabled()) {
+        return;
+      }
+      first = true;
+      builder.setLength(0);
+      builder.append("Execute task: ").append(task.getClass().getSimpleName());
+      task.describe(this);
+      if (!first) {
+        builder.append(")");
+      }
+
+      LOGGER.info(builder.toString());
+    }
+
+    TaskDescriptor item(String name, Object value) {
+      if (first) {
+        first = false;
+        builder.append("(");
+      } else {
+        builder.append(", ");
+      }
+      builder.append(name).append("=").append(value);
+      return this;
+    }
+  }
+
+  private interface GeneratorTask extends Task {
+    RelSubset group();
+    boolean exploring();
+    default boolean onProduce(RelNode node) {
+      return true;
+    }
+  }
+
+  /**
+   * O_GROUP.
+   */
+  private class OptimizeGroup implements Task {
+    private final RelSubset group;
+    private RelOptCost upperBound;
+
+    OptimizeGroup(RelSubset group, RelOptCost upperBound) {
+      this.group = group;
+      this.upperBound = upperBound;
+    }
+
+    @Override public void perform() {
+      RelOptCost winner = group.getWinnerCost();
+      if (winner != null) {
+        return;
+      }
+
+      if (group.taskState != null && upperBound.isLe(group.upperBound)) {
+        // this group failed to optimize before or it is a ring
+        return;
+      }
+
+      group.startOptimize(upperBound);
+
+      // cannot decide an actual lower bound before MExpr are fully explored
+      // so delay the lower bound checking
+
+      // a gate keeper to update context
+      tasks.push(new GroupOptimized(group));
+
+      // optimize mExprs in group
+      List<RelNode> physicals = new ArrayList<>();
+      for (RelNode rel : group.set.rels) {
+        if (planner.isLogical(rel)) {
+          tasks.push(new OptimizeMExpr(rel, group, false));
+        } else if (rel.isEnforcer()) {
+          // Enforcers have lower priority than other physical nodes
+          physicals.add(0, rel);
+        } else {
+          physicals.add(rel);
+        }
+      }
+      // always apply O_INPUTS first so as to get an valid upper bound
+      for (RelNode rel : physicals) {
+        Task task = getOptimizeInputTask(rel, group);
+        if (task != null) {
+          tasks.add(task);
+        }
+      }
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("group", group).item("upperBound", upperBound);
+    }
+  }
+
+  /**
+   * Mark the group optimized.
+   */
+  private static class GroupOptimized implements Task {
+    private final RelSubset group;
+
+    GroupOptimized(RelSubset group) {
+      this.group = group;
+    }
+
+    @Override public void perform() {
+      group.setOptimized();
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("group", group);
+    }
+  }
+
+  /**
+   * O_EXPR.
+   */
+  private class OptimizeMExpr implements Task {
+    private final RelNode mExpr;
+    private final RelSubset group;
+    private final boolean explore;
+
+    OptimizeMExpr(RelNode mExpr,
+        RelSubset group, boolean explore) {
+      this.mExpr = mExpr;
+      this.group = group;
+      this.explore = explore;
+    }
+
+    @Override public void perform() {
+      if (explore && group.isExplored()) {
+        return;
+      }
+      // 1. explode input
+      // 2. apply other rules
+      tasks.push(new ApplyRules(mExpr, group, explore));
+      for (int i = mExpr.getInputs().size() - 1; i >= 0; --i) {
+        tasks.push(new ExploreInput(mExpr, i));
+      }
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("mExpr", mExpr).item("explore", explore);
+    }
+  }
+
+  /**
+   * Ensure ExploreInput are working on the correct input group since calcite
+   * may merge sets.
+   */
+  private class EnsureGroupExplored implements Task {
+
+    private final RelSubset input;
+    private final RelNode parent;
+    private final int inputOrdinal;
+
+    EnsureGroupExplored(RelSubset input, RelNode parent, int inputOrdinal) {
+      this.input = input;
+      this.parent = parent;
+      this.inputOrdinal = inputOrdinal;
+    }
+
+    @Override public void perform() {
+      if (parent.getInput(inputOrdinal) != input) {
+        tasks.push(new ExploreInput(parent, inputOrdinal));
+        return;
+      }
+      input.setExplored();
+      for (RelSubset subset : input.getSet().subsets) {
+        // clear the LB cache as exploring state have changed
+        input.getCluster().getMetadataQuery().clearCache(subset);
+      }
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("mExpr", parent).item("i", inputOrdinal);
+    }
+  }
+
+  /**
+   * E_GROUP.
+   */
+  private class ExploreInput implements Task {
+    private final RelSubset group;
+    private final RelNode parent;
+    private final int inputOrdinal;
+
+    ExploreInput(RelNode parent, int inputOrdinal) {
+      this.group = (RelSubset) parent.getInput(inputOrdinal);
+      this.parent = parent;
+      this.inputOrdinal = inputOrdinal;
+    }
+
+    @Override public void perform() {
+      if (!group.explore()) {
+        return;
+      }
+      tasks.push(new EnsureGroupExplored(group, parent, inputOrdinal));
+      for (RelNode rel : group.set.rels) {
+        if (planner.isLogical(rel)) {
+          tasks.push(new OptimizeMExpr(rel, group, true));
+        }
+      }
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("group", group);
+    }
+  }
+
+  /**
+   * Extract rule matches from rule queue and add them to task stack.
+   */
+  private class ApplyRules implements Task {
+    private final RelNode mExpr;
+    private final RelSubset group;
+    private final boolean exploring;
+
+    ApplyRules(RelNode mExpr, RelSubset group, boolean exploring) {
+      this.mExpr = mExpr;
+      this.group = group;
+      this.exploring = exploring;
+    }
+
+    @Override public void perform() {
+      Pair<RelNode, Predicate<VolcanoRuleMatch>> category =
+          exploring ? Pair.of(mExpr, planner::isTransformationRule)
+              : Pair.of(mExpr, m -> true);
+      VolcanoRuleMatch match = ruleQueue.popMatch(category);
+      while (match != null) {
+        tasks.push(new ApplyRule(match, group, exploring));
+        match = ruleQueue.popMatch(category);
+      }
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("mExpr", mExpr).item("exploring", exploring);
+    }
+  }
+
+  /**
+   * APPLY_RULE.
+   */
+  private class ApplyRule implements GeneratorTask {
+    private final VolcanoRuleMatch match;
+    private final RelSubset group;
+    private final boolean exploring;
+
+    ApplyRule(VolcanoRuleMatch match, RelSubset group, boolean exploring) {
+      this.match = match;
+      this.group = group;
+      this.exploring = exploring;
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("match", match).item("exploring", exploring);
+    }
+
+    @Override public void perform() {
+      applyGenerator(this, match::onMatch);
+    }
+
+    @Override public RelSubset group() {
+      return group;
+    }
+
+    @Override public boolean exploring() {
+      return exploring;
+    }
+  }
+
+  private Task getOptimizeInputTask(RelNode rel, RelSubset group) {
+    if (!rel.getTraitSet().satisfies(group.getTraitSet())) {
+      RelNode passThroughRel = convert(rel, group);
+      if (passThroughRel == null) {
+        LOGGER.debug("Skip optimizing because of traits: {}", rel);
+        return null;
+      }
+      final RelNode finalPassThroughRel = passThroughRel;
+      applyGenerator(null, () ->
+          planner.register(finalPassThroughRel, group));
+      rel = passThroughRel;
+    }
+    boolean unProcess = false;
+    for (RelNode input : rel.getInputs()) {
+      RelOptCost winner = ((RelSubset) input).getWinnerCost();
+      if (winner == null) {
+        unProcess = true;
+        break;
+      }
+    }
+    if (!unProcess) {
+      return new DeriveTrait(rel, group);
+    }
+    if (rel.getInputs().size() == 1) {
+      return new OptimizeInput1(rel, group);
+    }
+    return new OptimizeInputs(rel, group);
+  }
+
+  private RelNode convert(RelNode rel, RelSubset group) {
+    if (!passThroughCache.contains(rel)) {
+      if (checkLowerBound(rel, group)) {
+        RelNode passThrough = group.passThrough(rel);
+        if (passThrough != null) {
+          passThroughCache.add(passThrough);
+          return passThrough;
+        }
+      } else {
+        LOGGER.debug("Skip pass though because of lower bound. LB = {}, UP = {}",
+            rel, group.upperBound);
+      }
+    }
+    VolcanoRuleMatch match = ruleQueue.popMatch(
+        Pair.of(rel,
+            m -> m.getRule() instanceof ConverterRule
+                && m.getRule().getOutTrait().satisfies(group.getTraitSet().getConvention())));
+    if (match != null) {
+      tasks.add(new ApplyRule(match, group, false));
+    }
+    return null;
+  }
+
+  private boolean checkLowerBound(RelNode rel, RelSubset group) {
+    RelOptCost upperBound = group.upperBound;
+    if (upperBound.isInfinite()) {
+      return true;
+    }
+    RelOptCost lb = planner.getLowerBound(rel);
+    return !upperBound.isLe(lb);
+  }
+
+  /**
+   * O_INPUT when there is only one input.
+   */
+  private class OptimizeInput1 implements Task {
+
+    private final RelNode mExpr;
+    private final RelSubset group;
+
+    OptimizeInput1(RelNode mExpr, RelSubset group) {
+      this.mExpr = mExpr;
+      this.group = group;
+    }
+
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("mExpr", mExpr).item("upperBound", group.upperBound);
+    }
+
+    @Override public void perform() {
+      RelOptCost upperBound = group.upperBound;
+      RelOptCost upperForInput = planner.upperBoundForInputs(mExpr, upperBound);
+      if (upperForInput.isLe(planner.zeroCost)) {
+        LOGGER.debug(
+            "Skip O_INPUT because of lower bound. UB4Inputs = {}, UB = {}",
+            upperForInput, upperBound);
+        return;
+      }
+
+      RelSubset input = (RelSubset) mExpr.getInput(0);
+
+      // Apply enforcing rules
+      tasks.push(new DeriveTrait(mExpr, group));
+
+      tasks.push(new CheckInput(null, mExpr, input, 0, upperForInput));
+      tasks.push(new OptimizeGroup(input, upperForInput));
+    }
+  }
+
+  /**
+   * O_INPUT.
+   */
+  private class OptimizeInputs implements Task {
+
+    private final RelNode mExpr;
+    private final RelSubset group;
+    private final int childCount;
+    private RelOptCost upperBound;
+    private RelOptCost upperForInput;
+    private int processingChild;
+
+    OptimizeInputs(RelNode rel, RelSubset group) {
+      this.mExpr = rel;
+      this.group = group;
+      this.upperBound = group.upperBound;
+      this.upperForInput = planner.infCost;
+      this.childCount = rel.getInputs().size();
+      this.processingChild = 0;
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("mExpr", mExpr).item("upperBound", upperBound)
+          .item("processingChild", processingChild);
+    }
+
+    private List<RelOptCost> lowerBounds;
+    private RelOptCost lowerBoundSum;
+    @Override public void perform() {
+      RelOptCost bestCost = group.bestCost;
+      if (!bestCost.isInfinite()) {
+        if (bestCost.isLt(upperBound)) {
+          upperBound = bestCost;
+          upperForInput = planner.upperBoundForInputs(mExpr, upperBound);
+        }
+
+        if (lowerBoundSum == null) {
+          if (upperForInput.isInfinite()) {
+            upperForInput = planner.upperBoundForInputs(mExpr, upperBound);
+          }
+          lowerBounds = new ArrayList<>(childCount);
+          for (RelNode input : mExpr.getInputs()) {
+            RelOptCost lb = planner.getLowerBound(input);
+            lowerBounds.add(lb);
+            lowerBoundSum = lowerBoundSum == null ? lb : lowerBoundSum.plus(lb);
+          }
+        }
+        if (upperForInput.isLt(lowerBoundSum)) {
+          LOGGER.debug(
+              "Skip O_INPUT because of lower bound. LB = {}, UP = {}",
+              lowerBoundSum, upperForInput);
+          return; // group pruned
+        }
+      }
+
+      if (lowerBoundSum != null && lowerBoundSum.isInfinite()) {
+        LOGGER.debug("Skip O_INPUT as one of the inputs fail to optimize");
+        return;
+      }
+
+      if (processingChild == 0) {
+        // Apply enforcing rules
+        tasks.push(new DeriveTrait(mExpr, group));
+      }
+
+      while (processingChild < childCount) {
+        RelSubset input =
+            (RelSubset) mExpr.getInput(processingChild);
+
+        RelOptCost winner = input.getWinnerCost();
+        if (winner != null) {
+          ++ processingChild;
+          continue;
+        }
+
+        RelOptCost upper = upperForInput;
+        if (!upper.isInfinite()) {
+          upper = upperForInput.minus(lowerBoundSum)
+              .plus(lowerBounds.get(processingChild));
+        }
+        if (input.taskState != null && upper.isLe(input.upperBound)) {
+          return;
+        }
+
+        if (processingChild != childCount - 1) {
+          tasks.push(this);
+        }
+        tasks.push(new CheckInput(this, mExpr, input, processingChild, upper));
+        tasks.push(new OptimizeGroup(input, upper));
+        ++ processingChild;
+        break;
+      }
+    }
+  }
+
+  /**
+   * Ensure input is optimized correctly and modify context.
+   */
+  private class CheckInput implements Task {
+
+    private final OptimizeInputs context;
+    private final RelOptCost upper;
+    private final RelNode parent;
+    private RelSubset input;
+    private final int i;
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("parent", parent).item("i", i);
+    }
+
+    CheckInput(OptimizeInputs context,
+        RelNode parent, RelSubset input, int i, RelOptCost upper) {
+      this.context = context;
+      this.parent = parent;
+      this.input = input;
+      this.i = i;
+      this.upper = upper;
+    }
+
+    @Override public void perform() {
+      if (input != parent.getInput(i)) {
+        input = (RelSubset) parent.getInput(i);
+        tasks.push(this);
+        tasks.push(new OptimizeGroup(input, upper));
+        return;
+      }
+      if (context == null) {
+        return;
+      }
+      RelOptCost winner = input.getWinnerCost();
+      if (winner == null) {
+        // the input fail to optimize due to group pruning
+        context.lowerBoundSum = planner.infCost;
+        return;
+      }
+      if (context.lowerBoundSum != null && context.lowerBoundSum != planner.infCost) {
+        context.lowerBoundSum = context.
+            lowerBoundSum.minus(context.lowerBounds.get(i));
+        context.lowerBoundSum = context.lowerBoundSum.plus(winner);
+        context.lowerBounds.set(i, winner);
+      }
+    }
+  }
+
+  private class DeriveTrait implements GeneratorTask {
+
+    private final RelNode mExpr;
+    private final RelSubset group;
+
+    DeriveTrait(RelNode mExpr, RelSubset group) {
+      this.mExpr = mExpr;
+      this.group = group;
+    }
+
+    @Override public void perform() {
+      List<RelNode> inputs = mExpr.getInputs();
+      for (RelNode input : inputs) {
+        if (((RelSubset) input).getWinnerCost() == null) {
+          // fail to optimize input, then no need to deliver traits
+          return;
+        }
+      }
+      tasks.push(new ApplyRules(mExpr, group, false));
+      if (!passThroughCache.contains(mExpr)) {
+        applyGenerator(this, this::derive);
+      }
+    }
+
+    private void derive() {
+      if (!(mExpr instanceof PhysicalNode)
+          || ((PhysicalNode) mExpr).getDeriveMode() == DeriveMode.PROHIBITED) {
+        return;
+      }
+
+      PhysicalNode rel = (PhysicalNode) mExpr;
+      DeriveMode mode = rel.getDeriveMode();
+      int arity = rel.getInputs().size();
+      // for OMAKASE
+      List<List<RelTraitSet>> inputTraits = new ArrayList<>(arity);
+
+      for (int i = 0; i < arity; i++) {
+        int childId = i;
+        if (mode == DeriveMode.RIGHT_FIRST) {
+          childId = arity - i - 1;
+        }
+
+        RelSubset input = (RelSubset) rel.getInput(childId);
+        List<RelTraitSet> traits = new ArrayList<>();
+        inputTraits.add(traits);
+
+        final int numSubset = input.set.subsets.size();
+        for (int j = 0; j < numSubset; j++) {
+          RelSubset subset = input.set.subsets.get(j);
+          if (!subset.isDelivered() || equalsSansConvention(
+              subset.getTraitSet(), rel.getCluster().traitSet())) {
+            // Ideally we should stop deriving new relnodes when the
+            // subset's traitSet equals with input traitSet, but
+            // in case someone manually builds a physical relnode
+            // tree, which is highly discouraged, without specifying
+            // correct traitSet, e.g.
+            //   EnumerableFilter  [].ANY
+            //       -> EnumerableMergeJoin  [a].Hash[a]
+            // We should still be able to derive the correct traitSet
+            // for the dumb filter, even though the filter's traitSet
+            // should be derived from the MergeJoin when it is created.
+            // But if the subset's traitSet equals with the default
+            // empty traitSet sans convention (the default traitSet
+            // from cluster may have logical convention, NONE, which
+            // is not interesting), we are safe to ignore it, because
+            // a physical filter with non default traitSet, but has a
+            // input with default empty traitSet, e.g.
+            //   EnumerableFilter  [a].Hash[a]
+            //       -> EnumerableProject  [].ANY
+            // is definitely wrong, we should fail fast.
+            continue;
+          }
+
+          if (mode == DeriveMode.OMAKASE) {
+            traits.add(subset.getTraitSet());
+          } else {
+            RelNode newRel = rel.derive(subset.getTraitSet(), childId);
+            if (newRel != null && !planner.isRegistered(newRel)) {
+              RelNode newInput = newRel.getInput(childId);
+              assert newInput instanceof RelSubset;
+              if (newInput == subset) {
+                // If the child subset is used to derive new traits for
+                // current relnode, the subset will be marked REQUIRED
+                // when registering the new derived relnode and later
+                // will add enforcers between other delivered subsets.
+                // e.g. a MergeJoin request both inputs hash distributed
+                // by [a,b] sorted by [a,b]. If the left input R1 happens to
+                // be distributed by [a], the MergeJoin can derive new
+                // traits from this input and request both input to be
+                // distributed by [a] sorted by [a,b]. In case there is a
+                // alternative R2 with ANY distribution in the left input's
+                // RelSet, we may end up with requesting hash distribution
+                // [a] on alternative R2, which is unnecessary and waste,
+                // because we request distribution by [a] because of R1 can
+                // deliver the exact same distribution and we don't need to
+                // enforce properties on other subsets that can't satisfy
+                // the specific trait requirement.
+                // Here we add a constraint that {@code newInput == subset},
+                // because if the delivered child subset is HASH[a], but
+                // we require HASH[a].SORT[a,b], we still need to enable
+                // property enforcement on the required subset. Otherwise,
+                // we need to restrict enforcement between HASH[a].SORT[a,b]
+                // and HASH[a] only, which will make things a little bit
+                // complicated. We might optimize it in the future.
+                subset.disableEnforcing();
+              }
+              RelSubset relSubset = planner.register(newRel, rel);
+              assert relSubset.set == planner.getSubset(rel).set;
+            }
+          }
+        }
+
+        if (mode == DeriveMode.LEFT_FIRST
+            || mode == DeriveMode.RIGHT_FIRST) {
+          break;
+        }
+      }
+
+      if (mode == DeriveMode.OMAKASE) {
+        List<RelNode> relList = rel.derive(inputTraits);
+        for (RelNode relNode : relList) {
+          if (!planner.isRegistered(relNode)) {
+            planner.register(relNode, rel);
+          }
+        }
+      }
+    }
+
+
+    /**
+     * Returns whether the 2 traitSets are equal without Convention.
+     * It assumes they have the same traitDefs order.
+     */
+    private boolean equalsSansConvention(RelTraitSet ts1, RelTraitSet ts2) {
+      assert ts1.size() == ts2.size();
+      for (int i = 0; i < ts1.size(); i++) {
+        RelTrait trait = ts1.getTrait(i);
+        if (trait.getTraitDef() == ConventionTraitDef.INSTANCE) {
+          continue;
+        }
+        if (!trait.equals(ts2.getTrait(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override public void describe(TaskDescriptor desc) {
+      desc.item("mExpr", mExpr).item("group", group);
+    }
+
+    @Override public RelSubset group() {
+      return group;
+    }
+
+    @Override public boolean exploring() {
+      return false;
+    }
+
+    @Override public boolean onProduce(RelNode node) {
+      passThroughCache.add(node);
+      return true;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleQueue.java b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleQueue.java
new file mode 100644
index 0000000..2c142c1
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleQueue.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.Pair;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+
+/**
+ * A rule queue that manage rule matches for cascade planner.
+ */
+class TopDownRuleQueue extends RuleQueue {
+
+  private final Map<RelNode, List<VolcanoRuleMatch>> matches = new HashMap<>();
+
+  private final Set<String> names = new HashSet<>();
+
+  TopDownRuleQueue(VolcanoPlanner planner) {
+    super(planner);
+  }
+
+  public void addMatch(VolcanoRuleMatch match) {
+    RelNode rel = match.rel(0);
+    List<VolcanoRuleMatch> queue = matches.
+        computeIfAbsent(rel, id -> new LinkedList<>());
+    addMatch(match, queue);
+  }
+
+  private void addMatch(VolcanoRuleMatch match, List<VolcanoRuleMatch> queue) {
+    if (!names.add(match.toString())) {
+      return;
+    }
+
+    if (!planner.isSubstituteRule(match)) {
+      queue.add(0, match);
+    } else {
+      queue.add(match);
+    }
+  }
+
+  public VolcanoRuleMatch popMatch(Pair<RelNode, Predicate<VolcanoRuleMatch>> category) {
+    List<VolcanoRuleMatch> queue = matches.get(category.left);
+    if (queue == null) {
+      return null;
+    }
+    Iterator<VolcanoRuleMatch> iterator = queue.iterator();
+    while (iterator.hasNext()) {
+      VolcanoRuleMatch next = iterator.next();
+      if (category.right != null && !category.right.test(next)) {
+        continue;
+      }
+      iterator.remove();
+      if (!skipMatch(next)) {
+        return next;
+      }
+    }
+    return null;
+  }
+
+  @Override public boolean clear() {
+    boolean empty = matches.isEmpty();
+    matches.clear();
+    names.clear();
+    return !empty;
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
index 54615db..e389863 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
@@ -48,6 +48,7 @@ import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.SubstitutionRule;
 import org.apache.calcite.rel.rules.TransformationRule;
 import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.sql.SqlExplainLevel;
@@ -59,6 +60,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 
+import org.apiguardian.api.API;
+
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayDeque;
@@ -133,9 +136,9 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
   private final Set<RelOptSchema> registeredSchemas = new HashSet<>();
 
   /**
-   * Holds rule calls waiting to be fired.
+   * A driver to manage rule and rule matches.
    */
-  final RuleQueue ruleQueue = new RuleQueue(this);
+  RuleDriver ruleDriver;
 
   /**
    * Holds the currently registered RelTraitDefs.
@@ -146,6 +149,8 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
 
   private RelNode originalRoot;
 
+  private Convention rootConvention;
+
   /**
    * Whether the planner can accept new rules.
    */
@@ -171,22 +176,21 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
 
   /** Zero cost, according to {@link #costFactory}. Not necessarily a
    * {@link org.apache.calcite.plan.volcano.VolcanoCost}. */
-  private final RelOptCost zeroCost;
+  final RelOptCost zeroCost;
 
-  /**
-   * Optimization tasks including trait propagation, enforcement.
-   */
-  final Deque<OptimizeTask> tasks = new ArrayDeque<>();
+  /** Infinite cost, according to {@link #costFactory}. Not necessarily a
+   * {@link org.apache.calcite.plan.volcano.VolcanoCost}. */
+  final RelOptCost infCost;
 
   /**
-   * The id generator for optimization tasks.
+   * Whether to enable top-down optimization or not.
    */
-  int nextTaskId = 0;
+  boolean topDownOpt = CalciteSystemProperty.TOPDOWN_OPT.value();
 
   /**
-   * Whether to enable top-down optimization or not.
+   * Extra roots for explorations.
    */
-  boolean topDownOpt = CalciteSystemProperty.TOPDOWN_OPT.value();
+  Set<RelSubset> explorationRoots = new HashSet<>();
 
   //~ Constructors -----------------------------------------------------------
 
@@ -216,9 +220,19 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     super(costFactory == null ? VolcanoCost.FACTORY : costFactory,
         externalContext);
     this.zeroCost = this.costFactory.makeZeroCost();
+    this.infCost = this.costFactory.makeInfiniteCost();
     // If LOGGER is debug enabled, enable provenance information to be captured
     this.provenanceMap = LOGGER.isDebugEnabled() ? new HashMap<>()
         : Util.blackholeMap();
+    initRuleQueue();
+  }
+
+  private void initRuleQueue() {
+    if (topDownOpt) {
+      ruleDriver = new TopDownRuleDriver(this);
+    } else {
+      ruleDriver = new IterativeRuleDriver(this);
+    }
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -236,11 +250,15 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
   /**
    * Enable or disable top-down optimization.
    *
-   * <p>Note: Enabling top-down optimization will automatically disable
-   * the use of AbstractConverter and related rules.</p>
+   * <p>Note: Enabling top-down optimization will automatically enable
+   * top-down trait propagation.</p>
    */
   public void setTopDownOpt(boolean value) {
+    if (topDownOpt == value) {
+      return;
+    }
     topDownOpt = value;
+    initRuleQueue();
   }
 
   // implement RelOptPlanner
@@ -259,6 +277,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
       this.originalRoot = rel;
     }
 
+    rootConvention = this.root.getConvention();
     ensureRootConverters();
   }
 
@@ -311,6 +330,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     }
     for (RelOptMaterialization materialization : applicableMaterializations) {
       RelSubset subset = registerImpl(materialization.queryRel, null);
+      explorationRoots.add(subset);
       RelNode tableRel2 =
           RelOptUtil.createCastRel(
               materialization.tableRel,
@@ -382,7 +402,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     this.mapDigestToRel.clear();
     this.mapRel2Subset.clear();
     this.prunedNodes.clear();
-    this.ruleQueue.clear();
+    this.ruleDriver.clear();
     this.materializations.clear();
     this.latticeByName.clear();
     this.provenanceMap.clear();
@@ -490,13 +510,6 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
    * Finds the most efficient expression to implement the query given via
    * {@link org.apache.calcite.plan.RelOptPlanner#setRoot(org.apache.calcite.rel.RelNode)}.
    *
-   * <p>The algorithm executes repeatedly in a series of phases. In each phase
-   * the exact rules that may be fired varies. The mapping of phases to rule
-   * sets is maintained in the {@link #ruleQueue}.
-   *
-   * <p>In each phase, the planner then iterates over the rule matches presented
-   * by the rule queue until the rule queue becomes empty.
-   *
    * @return the most efficient RelNode tree found for implementing the given
    * query
    */
@@ -504,54 +517,14 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     ensureRootConverters();
     registerMaterializations();
 
-    PLANNING:
-    for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
-      while (true) {
-        LOGGER.debug("PLANNER = {}; PHASE = {}; COST = {}",
-            this, phase.toString(), root.bestCost);
-
-        VolcanoRuleMatch match = ruleQueue.popMatch(phase);
-        if (match == null) {
-          break;
-        }
-
-        assert match.getRule().matches(match);
-        try {
-          match.onMatch();
-        } catch (VolcanoTimeoutException e) {
-          LOGGER.warn("Volcano planning times out, cancels the subsequent optimization.");
-          root = canonize(root);
-          ruleQueue.phaseCompleted(phase);
-          break PLANNING;
-        }
-
-        // The root may have been merged with another
-        // subset. Find the new root subset.
-        root = canonize(root);
-      }
-
-      ruleQueue.phaseCompleted(phase);
-    }
-
-    if (topDownOpt) {
-      tasks.push(OptimizeTask.create(root));
-      while (!tasks.isEmpty()) {
-        OptimizeTask task = tasks.peek();
-        if (task.hasSubTask()) {
-          tasks.push(task.nextSubTask());
-          continue;
-        }
-        task = tasks.pop();
-        task.execute();
-      }
-    }
+    ruleDriver.drive();
 
     if (LOGGER.isTraceEnabled()) {
       StringWriter sw = new StringWriter();
       final PrintWriter pw = new PrintWriter(sw);
       dump(pw);
       pw.flush();
-      LOGGER.trace(sw.toString());
+      LOGGER.info(sw.toString());
     }
     dumpRuleAttemptsInfo();
     RelNode cheapest = root.buildCheapestPlan(this);
@@ -776,11 +749,6 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     return set.getSubset(traits);
   }
 
-  boolean isSeedNode(RelNode node) {
-    final RelSet set = getSubset(node).set;
-    return set.seeds.contains(node);
-  }
-
   RelNode changeTraitsUsingConverters(
       RelNode rel,
       RelTraitSet toTraits) {
@@ -987,6 +955,13 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
   }
 
   /**
+   * Find the new root subset in case the root is merged with another subset
+   */
+  void canonize() {
+    root = canonize(root);
+  }
+
+  /**
    * If a subset has one or more equivalent subsets (owing to a set having
    * merged with another), returns the subset which is the leader of the
    * equivalence class.
@@ -1088,6 +1063,9 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
       ensureRootConverters();
     }
 
+    if (ruleDriver != null) {
+      ruleDriver.onSetMerged(set);
+    }
     return set;
   }
 
@@ -1258,8 +1236,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     final int subsetBeforeCount = set.subsets.size();
     RelSubset subset = addRelToSet(rel, set);
 
-    final RelNode xx = mapDigestToRel.put(digest, rel);
-    assert xx == null || xx == rel : rel.getDigest();
+    final RelNode xx = mapDigestToRel.putIfAbsent(digest, rel);
 
     LOGGER.trace("Register {} in {}", rel, subset);
 
@@ -1303,6 +1280,10 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
       // ignore
     }
 
+    if (ruleDriver != null) {
+      ruleDriver.onProduce(rel, subset);
+    }
+
     return subset;
   }
 
@@ -1397,6 +1378,79 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     this.locked = locked;
   }
 
+  /**
+   * Decide whether a rule is logical or not.
+   * @param rel The specific rel node
+   * @return True if the relnode is a logical node
+   */
+  @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+  public boolean isLogical(RelNode rel) {
+    return !(rel instanceof PhysicalNode)
+        && rel.getConvention() != rootConvention;
+  }
+
+  /**
+   * Check whether a rule match is a substitute rule match.
+   * @param match The rule match to check
+   * @return True if the rule match is a substitute rule match
+   */
+  @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+  protected boolean isSubstituteRule(VolcanoRuleCall match) {
+    return match.getRule() instanceof SubstitutionRule;
+  }
+
+  /**
+   * Check whether a rule match is a transformation rule match.
+   * @param match The rule match to check
+   * @return True if the rule match is a transformation rule match
+   */
+  @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+  protected boolean isTransformationRule(VolcanoRuleCall match) {
+    if (match.getRule() instanceof SubstitutionRule) {
+      return true;
+    }
+    if (match.getRule() instanceof ConverterRule
+        && match.getRule().getOutTrait() == rootConvention) {
+      return false;
+    }
+    return match.getRule().getOperand().trait == Convention.NONE
+        || match.getRule().getOperand().trait == null;
+  }
+
+
+  /**
+   * Gets the lower bound cost of a relational operator.
+   * @param rel The rel node
+   * @return The lower bound cost of the given rel. The value is ensured NOT NULL.
+   */
+  @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+  protected RelOptCost getLowerBound(RelNode rel) {
+    RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
+    RelOptCost lowerBound = mq.getLowerBoundCost(rel, this);
+    if (lowerBound == null) {
+      return zeroCost;
+    }
+    return lowerBound;
+  }
+
+  /**
+   * Gets the upper bound of its inputs.
+   * Allow users to overwrite this method as some implementations may have
+   * different cost model on some RelNodes, like Spool.
+   */
+  @API(since = "1.24", status = API.Status.EXPERIMENTAL)
+  protected RelOptCost upperBoundForInputs(
+      RelNode mExpr, RelOptCost upperBound) {
+    if (!upperBound.isInfinite()) {
+      RelOptCost rootCost = mExpr.getCluster()
+          .getMetadataQuery().getNonCumulativeCost(mExpr);
+      if (!rootCost.isInfinite()) {
+        return upperBound.minus(rootCost);
+      }
+    }
+    return upperBound;
+  }
+
   //~ Inner Classes ----------------------------------------------------------
 
   /**
@@ -1422,7 +1476,7 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
               getOperand0(),
               rels,
               nodeInputs);
-      volcanoPlanner.ruleQueue.addMatch(match);
+      volcanoPlanner.ruleDriver.getRuleQueue().addMatch(match);
     }
   }
 
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java b/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java
index 4cb3ba4..aada556 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/BuiltInMetadata.java
@@ -17,8 +17,8 @@
 package org.apache.calcite.rel.metadata;
 
 import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
@@ -622,6 +622,21 @@ public abstract class BuiltInMetadata {
     }
   }
 
+  /** Metadata to get the lower bound cost of a RelNode. */
+  public interface LowerBoundCost extends Metadata {
+    MetadataDef<LowerBoundCost> DEF = MetadataDef.of(LowerBoundCost.class,
+        LowerBoundCost.Handler.class, BuiltInMethod.LOWER_BOUND_COST.method);
+
+    /** Returns the lower bound cost of a RelNode. */
+    RelOptCost getLowerBoundCost(VolcanoPlanner planner);
+
+    /** Handler API. */
+    interface Handler extends MetadataHandler<LowerBoundCost> {
+      RelOptCost getLowerBoundCost(
+          RelNode r, RelMetadataQuery mq, VolcanoPlanner planner);
+    }
+  }
+
   /** Metadata about the memory use of an operator. */
   public interface Memory extends Metadata {
     MetadataDef<Memory> DEF = MetadataDef.of(Memory.class,
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java b/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java
index 4631092..2729ab2 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/DefaultRelMetadataProvider.java
@@ -55,6 +55,7 @@ public class DefaultRelMetadataProvider extends ChainedRelMetadataProvider {
             RelMdSize.SOURCE,
             RelMdParallelism.SOURCE,
             RelMdDistribution.SOURCE,
+            RelMdLowerBoundCost.SOURCE,
             RelMdMemory.SOURCE,
             RelMdDistinctRowCount.SOURCE,
             RelMdSelectivity.SOURCE,
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdLowerBoundCost.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdLowerBoundCost.java
new file mode 100644
index 0000000..f7ea60a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdLowerBoundCost.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.metadata;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.BuiltInMetadata.LowerBoundCost;
+import org.apache.calcite.util.BuiltInMethod;
+
+/**
+ * Default implementations of the
+ * {@link BuiltInMetadata.LowerBoundCost}
+ * metadata provider for the standard algebra.
+ */
+public class RelMdLowerBoundCost implements MetadataHandler<LowerBoundCost> {
+
+  public static final RelMetadataProvider SOURCE =
+      ReflectiveRelMetadataProvider.reflectiveSource(
+          new RelMdLowerBoundCost(), BuiltInMethod.LOWER_BOUND_COST.method);
+
+  //~ Constructors -----------------------------------------------------------
+
+  protected RelMdLowerBoundCost() {}
+
+  //~ Methods ----------------------------------------------------------------
+
+  public MetadataDef<LowerBoundCost> getDef() {
+    return BuiltInMetadata.LowerBoundCost.DEF;
+  }
+
+  public RelOptCost getLowerBoundCost(RelSubset subset,
+      RelMetadataQuery mq, VolcanoPlanner planner) {
+
+    if (planner.isLogical(subset)) {
+      // currently only support physical, will improve in the future
+      return null;
+    }
+
+    return subset.getWinnerCost();
+  }
+
+  public RelOptCost getLowerBoundCost(RelNode node,
+      RelMetadataQuery mq, VolcanoPlanner planner) {
+    if (planner.isLogical(node)) {
+      // currently only support physical, will improve in the future
+      return null;
+    }
+
+    RelOptCost selfCost = mq.getNonCumulativeCost(node);
+    if (selfCost.isInfinite()) {
+      selfCost = null;
+    }
+    for (RelNode input : node.getInputs()) {
+      RelOptCost lb = mq.getLowerBoundCost(input, planner);
+      if (lb != null) {
+        selfCost = selfCost == null ? lb : selfCost.plus(lb);
+      }
+    }
+    return selfCost;
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java
index aadc8e5..f27b085 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataQuery.java
@@ -19,6 +19,7 @@ package org.apache.calcite.rel.metadata;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPredicateList;
 import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
@@ -97,6 +98,7 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
   private BuiltInMetadata.Selectivity.Handler selectivityHandler;
   private BuiltInMetadata.Size.Handler sizeHandler;
   private BuiltInMetadata.UniqueKeys.Handler uniqueKeysHandler;
+  private BuiltInMetadata.LowerBoundCost.Handler lowerBoundCostHandler;
 
   /**
    * Creates the instance with {@link JaninoRelMetadataProvider} instance
@@ -134,6 +136,7 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
     this.selectivityHandler = initialHandler(BuiltInMetadata.Selectivity.Handler.class);
     this.sizeHandler = initialHandler(BuiltInMetadata.Size.Handler.class);
     this.uniqueKeysHandler = initialHandler(BuiltInMetadata.UniqueKeys.Handler.class);
+    this.lowerBoundCostHandler = initialHandler(BuiltInMetadata.LowerBoundCost.Handler.class);
   }
 
   private RelMetadataQuery(JaninoRelMetadataProvider metadataProvider,
@@ -162,6 +165,7 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
     this.selectivityHandler = prototype.selectivityHandler;
     this.sizeHandler = prototype.sizeHandler;
     this.uniqueKeysHandler = prototype.uniqueKeysHandler;
+    this.lowerBoundCostHandler = prototype.lowerBoundCostHandler;
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -843,4 +847,18 @@ public class RelMetadataQuery extends RelMetadataQueryBase {
       }
     }
   }
+
+  /**
+   * Returns the lower bound cost of a RelNode.
+   */
+  public RelOptCost getLowerBoundCost(RelNode rel, VolcanoPlanner planner) {
+    for (;;) {
+      try {
+        return lowerBoundCostHandler.getLowerBoundCost(rel, this, planner);
+      } catch (JaninoRelMetadataProvider.NoHandler e) {
+        lowerBoundCostHandler =
+            revise(e.relClass, BuiltInMetadata.LowerBoundCost.DEF);
+      }
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/tools/Programs.java b/core/src/main/java/org/apache/calcite/tools/Programs.java
index 2432a45..7626fa8 100644
--- a/core/src/main/java/org/apache/calcite/tools/Programs.java
+++ b/core/src/main/java/org/apache/calcite/tools/Programs.java
@@ -251,8 +251,6 @@ public class Programs {
   public static Program standard(RelMetadataProvider metadataProvider) {
     final Program program1 =
         (planner, rel, requiredOutputTraits, materializations, lattices) -> {
-          planner.setRoot(rel);
-
           for (RelOptMaterialization materialization : materializations) {
             planner.addMaterialization(materialization);
           }
@@ -260,6 +258,7 @@ public class Programs {
             planner.addLattice(lattice);
           }
 
+          planner.setRoot(rel);
           final RelNode rootRel2 =
               rel.getTraitSet().equals(requiredOutputTraits)
                   ? rel
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 4f92c5f..e0ca291 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -51,6 +51,7 @@ import org.apache.calcite.linq4j.function.Predicate2;
 import org.apache.calcite.linq4j.tree.FunctionExpression;
 import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.linq4j.tree.Types;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.AllPredicates;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.Collation;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.ColumnOrigin;
@@ -60,6 +61,7 @@ import org.apache.calcite.rel.metadata.BuiltInMetadata.DistinctRowCount;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.Distribution;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.ExplainVisibility;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.ExpressionLineage;
+import org.apache.calcite.rel.metadata.BuiltInMetadata.LowerBoundCost;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.MaxRowCount;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.Memory;
 import org.apache.calcite.rel.metadata.BuiltInMetadata.MinRowCount;
@@ -553,6 +555,8 @@ public enum BuiltInMethod {
   AVERAGE_COLUMN_SIZES(Size.class, "averageColumnSizes"),
   IS_PHASE_TRANSITION(Parallelism.class, "isPhaseTransition"),
   SPLIT_COUNT(Parallelism.class, "splitCount"),
+  LOWER_BOUND_COST(LowerBoundCost.class, "getLowerBoundCost",
+      VolcanoPlanner.class),
   MEMORY(Memory.class, "memory"),
   CUMULATIVE_MEMORY_WITHIN_PHASE(Memory.class, "cumulativeMemoryWithinPhase"),
   CUMULATIVE_MEMORY_WITHIN_PHASE_SPLIT(Memory.class,
diff --git a/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java b/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java
index 8f200ec..9d1371f 100644
--- a/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/plan/volcano/VolcanoPlannerTest.java
@@ -37,6 +37,7 @@ import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.rules.CoreRules;
 import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Pair;
 
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -64,7 +65,7 @@ import static org.apache.calcite.test.Matchers.isLinux;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -501,12 +502,18 @@ class VolcanoPlannerTest {
 
     // verify that the rule match cannot be popped,
     // as the related node has been pruned
+    RuleQueue ruleQueue = planner.ruleDriver.getRuleQueue();
     while (true) {
-      VolcanoRuleMatch ruleMatch = planner.ruleQueue.popMatch(VolcanoPlannerPhase.OPTIMIZE);
+      VolcanoRuleMatch ruleMatch;
+      if (ruleQueue instanceof IterativeRuleQueue) {
+        ruleMatch = ((IterativeRuleQueue) ruleQueue).popMatch(VolcanoPlannerPhase.OPTIMIZE);
+      } else {
+        ruleMatch = ((TopDownRuleQueue) ruleQueue).popMatch(Pair.of(leafRel, null));
+      }
       if (ruleMatch == null) {
         break;
       }
-      assertFalse(ruleMatch.rels[0] == leafRel);
+      assertNotSame(leafRel, ruleMatch.rels[0]);
     }
   }
 
diff --git a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
index d92a451..2b32b52 100644
--- a/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/JdbcAdapterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.test;
 
+import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.test.CalciteAssert.AssertThat;
 import org.apache.calcite.test.CalciteAssert.DatabaseInstance;
@@ -179,6 +180,7 @@ class JdbcAdapterTest {
 
   @Test void testPushDownSort() {
     CalciteAssert.model(JdbcTest.SCOTT_MODEL)
+        .with(CalciteConnectionProperty.TOPDOWN_OPT.camelName(), false)
         .query("select ename\n"
             + "from scott.emp\n"
             + "order by empno")
@@ -211,6 +213,7 @@ class JdbcAdapterTest {
         + "GROUP BY \"JOB\", \"DEPTNO\"\n"
         + "ORDER BY \"DEPTNO\" NULLS LAST, \"JOB\" NULLS LAST";
     CalciteAssert.model(JdbcTest.SCOTT_MODEL)
+        .with(CalciteConnectionProperty.TOPDOWN_OPT.camelName(), false)
         .query(sql)
         .explainContains(explain)
         .runs()
diff --git a/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml b/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
index f2507fa..0be66a5 100644
--- a/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
@@ -51,8 +51,8 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-EnumerableProject(MGR=[$3])
-  EnumerableSort(sort0=[$3], dir0=[DESC-nulls-last])
+EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
+  EnumerableProject(MGR=[$3])
     EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
 ]]>
     </Resource>
@@ -77,8 +77,8 @@ EnumerableSortedAggregate(group=[{0, 1, 2}])
   EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
     EnumerableProject(ENAME=[$0], EXPR$1=[*($2, -2)], MGR=[$1])
       EnumerableLimit(fetch=[100])
-        EnumerableProject(ENAME=[$1], MGR=[$3], SAL=[$5])
-          EnumerableSort(sort0=[$1], sort1=[$3], sort2=[$5], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+        EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+          EnumerableProject(ENAME=[$1], MGR=[$3], SAL=[$5])
             EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
 ]]>
     </Resource>
@@ -147,8 +147,8 @@ LogicalSort(sort0=[$1], dir0=[DESC])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-EnumerableProject(DEPTNO=[$7], EXPR$1=[CAST($7):FLOAT NOT NULL])
-  EnumerableSort(sort0=[$7], dir0=[DESC])
+EnumerableSort(sort0=[$1], dir0=[DESC])
+  EnumerableProject(DEPTNO=[$7], EXPR$1=[CAST($7):FLOAT NOT NULL])
     EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
 ]]>
     </Resource>
@@ -233,8 +233,8 @@ EnumerableProject(ENAME=[$0], JOB=[$1], EXPR$2=[$2], ENAME0=[$3], JOB0=[$4], SAL
       EnumerableSortedAggregate(group=[{1, 2}], MAX_SAL=[MAX($5)])
         EnumerableSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])
           EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
-    EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
-      EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+    EnumerableSort(sort0=[$4], sort1=[$0], dir0=[ASC], dir1=[ASC])
+      EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
         EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
     </Resource>
@@ -267,8 +267,8 @@ EnumerableProject(ENAME=[$0], JOB=[$1], EXPR$2=[$2], ENAME0=[$3], JOB0=[$4], SAL
         EnumerableLimit(fetch=[100])
           EnumerableProject(ENAME=[$1], JOB=[$2], SAL=[$5])
             EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
-    EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
-      EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+    EnumerableSort(sort0=[$4], sort1=[$0], dir0=[ASC], dir1=[ASC])
+      EnumerableProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], JOB0=[CAST($1):VARCHAR NOT NULL])
         EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
     </Resource>
@@ -670,8 +670,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
     <Resource name="planAfter">
       <![CDATA[
 EnumerableHashJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
-  EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
-    EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+  EnumerableSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
+    EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
       EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
   EnumerableProject(ACCTNO=[$0], TYPE=[$1])
     EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
@@ -701,8 +701,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
       <![CDATA[
 EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
   EnumerableHashJoin(condition=[AND(=($0, $10), =($3, $11))], joinType=[left])
-    EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
-      EnumerableSort(sort0=[$1], dir0=[DESC])
+    EnumerableSort(sort0=[$1], dir0=[DESC])
+      EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
         EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
     EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
 ]]>
@@ -730,8 +730,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
     <Resource name="planAfter">
       <![CDATA[
 EnumerableHashJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
-  EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
-    EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+  EnumerableSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
+    EnumerableProject(CONTACTNO=[$0], EMAIL=[$3])
       EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
   EnumerableProject(ACCTNO=[$0], TYPE=[$1])
     EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
@@ -790,8 +790,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
       <![CDATA[
 EnumerableHashJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])
   EnumerableLimit(fetch=[10])
-    EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
-      EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+    EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+      EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
         EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
   EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
@@ -819,8 +819,8 @@ LogicalSort(sort0=[$2], dir0=[DESC])
       <![CDATA[
 EnumerableHashJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])
   EnumerableLimit(fetch=[10])
-    EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
-      EnumerableSort(sort0=[$3], dir0=[DESC])
+    EnumerableSort(sort0=[$2], dir0=[DESC])
+      EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
         EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
   EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
@@ -849,8 +849,8 @@ LogicalSort(sort0=[$2], dir0=[ASC])
 EnumerableSort(sort0=[$2], dir0=[ASC])
   EnumerableHashJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])
     EnumerableLimit(fetch=[10])
-      EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
-        EnumerableSort(sort0=[$3], dir0=[DESC])
+      EnumerableSort(sort0=[$2], dir0=[DESC])
+        EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
           EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
     EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
@@ -878,8 +878,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
       <![CDATA[
 EnumerableNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner])
   EnumerableLimit(fetch=[10])
-    EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
-      EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+    EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+      EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
         EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
   EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
@@ -907,8 +907,8 @@ LogicalSort(sort0=[$2], dir0=[ASC])
       <![CDATA[
 EnumerableNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner])
   EnumerableLimit(fetch=[10])
-    EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
-      EnumerableSort(sort0=[$3], dir0=[ASC])
+    EnumerableSort(sort0=[$2], dir0=[ASC])
+      EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
         EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
   EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
@@ -937,8 +937,8 @@ LogicalSort(sort0=[$2], dir0=[DESC])
 EnumerableSort(sort0=[$2], dir0=[DESC])
   EnumerableNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner])
     EnumerableLimit(fetch=[10])
-      EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
-        EnumerableSort(sort0=[$3], dir0=[ASC])
+      EnumerableSort(sort0=[$2], dir0=[ASC])
+        EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
           EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
     EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
@@ -967,8 +967,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
       <![CDATA[
 EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
   EnumerableNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[left])
-    EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
-      EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+    EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+      EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
         EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
     EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
 ]]>
@@ -997,8 +997,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
       <![CDATA[
 EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
   EnumerableNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[left])
-    EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
-      EnumerableSort(sort0=[$1], dir0=[DESC])
+    EnumerableSort(sort0=[$1], dir0=[DESC])
+      EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
         EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
     EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
 ]]>
@@ -1054,8 +1054,8 @@ LogicalSort(sort0=[$2], sort1=[$3], dir0=[DESC], dir1=[DESC])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-EnumerableProject(CONTACTNO=[$0], EMAIL=[$3], ACCTNO=[$10], TYPE=[$11])
-  EnumerableSort(sort0=[$10], sort1=[$11], dir0=[DESC], dir1=[DESC])
+EnumerableSort(sort0=[$2], sort1=[$3], dir0=[DESC], dir1=[DESC])
+  EnumerableProject(CONTACTNO=[$0], EMAIL=[$3], ACCTNO=[$10], TYPE=[$11])
     EnumerableNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[right])
       EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
         EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
@@ -1215,8 +1215,8 @@ LogicalProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5
       <![CDATA[
 EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4], Y=[$5], unit=[$6], COORD_NE=[ROW($7, ROW($8, $9))], ACCTNO=[$10], TYPE=[$11], BALANCE=[$12])
   EnumerableBatchNestedLoopJoin(condition=[AND(>($0, $10), <($3, $11))], joinType=[left], batchSize=[100])
-    EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
-      EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+    EnumerableSort(sort0=[$0], sort1=[$3], dir0=[DESC], dir1=[DESC])
+      EnumerableProject(CONTACTNO=[$0], FNAME=[$1], LNAME=[$2], EMAIL=[$3], X=[$4.X], Y=[$4.Y], unit=[$4.unit], M=[$5.M], A=[$5.SUB.A], B=[$5.SUB.B])
         EnumerableTableScan(table=[[CATALOG, CUSTOMER, CONTACT_PEEK]])
     EnumerableFilter(condition=[OR(AND(>($cor0.CONTACTNO, $0), <($cor0.EMAIL, $1)), AND(>($cor1.CONTACTNO, $0), <($cor1.EMAIL, $1)), AND(>($cor2.CONTACTNO, $0), <($cor2.EMAIL, $1)), AND(>($cor3.CONTACTNO, $0), <($cor3.EMAIL, $1)), AND(>($cor4.CONTACTNO, $0), <($cor4.EMAIL, $1)), AND(>($cor5.CONTACTNO, $0), <($cor5.EMAIL, $1)), AND(>($cor6.CONTACTNO, $0), <($cor6.EMAIL, $1)), AND(>($cor7.CONTACTNO, $0), <($cor7.EMAIL, $1)), AND(>($cor8.CONTACTNO, $0), <($cor8.EMAIL, $1)), AND(>($cor9.CONT [...]
       EnumerableTableScan(table=[[CATALOG, CUSTOMER, ACCOUNT]])
@@ -1245,8 +1245,8 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC], dir1=[DESC])
       <![CDATA[
 EnumerableBatchNestedLoopJoin(condition=[AND(>($0, $3), <($1, $4))], joinType=[inner], batchSize=[100])
   EnumerableLimit(fetch=[10])
-    EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
-      EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+    EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[DESC], dir1=[DESC], dir2=[ASC])
+      EnumerableProject(ENAME=[$1], JOB=[$2], MGR=[$3])
         EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
   EnumerableFilter(condition=[OR(AND(>($cor0.ENAME, $0), <($cor0.JOB, $1)), AND(>($cor1.ENAME, $0), <($cor1.JOB, $1)), AND(>($cor2.ENAME, $0), <($cor2.JOB, $1)), AND(>($cor3.ENAME, $0), <($cor3.JOB, $1)), AND(>($cor4.ENAME, $0), <($cor4.JOB, $1)), AND(>($cor5.ENAME, $0), <($cor5.JOB, $1)), AND(>($cor6.ENAME, $0), <($cor6.JOB, $1)), AND(>($cor7.ENAME, $0), <($cor7.JOB, $1)), AND(>($cor8.ENAME, $0), <($cor8.JOB, $1)), AND(>($cor9.ENAME, $0), <($cor9.JOB, $1)), AND(>($cor10.ENAME, $0), <($c [...]
     EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
@@ -1303,8 +1303,8 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-EnumerableCalc(expr#0..8=[{inputs}], MGR=[$t3])
-  EnumerableSort(sort0=[$3], dir0=[DESC-nulls-last])
+EnumerableSort(sort0=[$0], dir0=[DESC-nulls-last])
+  EnumerableCalc(expr#0..8=[{inputs}], MGR=[$t3])
     EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
 ]]>
     </Resource>
@@ -1359,8 +1359,8 @@ EnumerableSortedAggregate(group=[{0, 1, 2}])
   EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
     EnumerableCalc(expr#0..2=[{inputs}], expr#3=[-2], expr#4=[*($t2, $t3)], ENAME=[$t0], EXPR$1=[$t4], MGR=[$t1])
       EnumerableLimit(fetch=[100])
-        EnumerableCalc(expr#0..8=[{inputs}], ENAME=[$t1], MGR=[$t3], SAL=[$t5])
-          EnumerableSort(sort0=[$1], sort1=[$3], sort2=[$5], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+        EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+          EnumerableCalc(expr#0..8=[{inputs}], ENAME=[$t1], MGR=[$t3], SAL=[$t5])
             EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
 ]]>
     </Resource>
@@ -1393,8 +1393,8 @@ EnumerableCalc(expr#0..7=[{inputs}], proj#0..6=[{exprs}])
         EnumerableLimit(fetch=[100])
           EnumerableCalc(expr#0..8=[{inputs}], ENAME=[$t1], JOB=[$t2], SAL=[$t5])
             EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
-    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):VARCHAR NOT NULL], proj#0..4=[{exprs}])
-      EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+    EnumerableSort(sort0=[$4], sort1=[$0], dir0=[ASC], dir1=[ASC])
+      EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):VARCHAR NOT NULL], proj#0..4=[{exprs}])
         EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
 ]]>
     </Resource>
@@ -1451,8 +1451,8 @@ LogicalSort(sort0=[$1], dir0=[DESC])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-EnumerableCalc(expr#0..8=[{inputs}], expr#9=[CAST($t7):FLOAT NOT NULL], EXPR$0=[$t9], DEPTNO=[$t7])
-  EnumerableSort(sort0=[$7], dir0=[DESC])
+EnumerableSort(sort0=[$1], dir0=[DESC])
+  EnumerableCalc(expr#0..8=[{inputs}], expr#9=[CAST($t7):FLOAT NOT NULL], EXPR$0=[$t9], DEPTNO=[$t7])
     EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
 ]]>
     </Resource>
diff --git a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
index d4781e1..bb9fa46 100644
--- a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
+++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.adapter.kafka;
 
+import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.test.CalciteAssert;
 
 import com.google.common.io.Resources;
@@ -73,6 +74,7 @@ class KafkaAdapterTest {
 
   @Test void testFilterWithProject() {
     assertModel(MODEL)
+        .with(CalciteConnectionProperty.TOPDOWN_OPT.camelName(), false)
         .query("SELECT STREAM MSG_PARTITION,MSG_OFFSET,MSG_VALUE_BYTES FROM KAFKA.MOCKTABLE"
             + " WHERE MSG_OFFSET>0")
         .limit(1)