You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by hy...@apache.org on 2020/05/10 21:59:05 UTC

[calcite] branch master updated: [CALCITE-3896] Top down trait request

This is an automated email from the ASF dual-hosted git repository.

hyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new bb48485  [CALCITE-3896] Top down trait request
bb48485 is described below

commit bb484859e115596061c572b689a3b04349299458
Author: Haisheng Yuan <h....@alibaba-inc.com>
AuthorDate: Wed Apr 29 09:56:19 2020 -0500

    [CALCITE-3896] Top down trait request
    
    1. Top-down trait request
    2. Bottom-up trait derivation
    3. Trait enforcement without AbstractConverter
    
    How to use?
    
    1. Enable top-down optimization by setting {VolcanoPlanner#setTopDownOpt(boolean)}
    or add 'calcite.planner.topdown.opt=true' to saffron.properties config file.
    
    2. Let your convention's rel interface extends {PhysicalNode}, see
    {EnumerableRel} as an example.
    
    3. Each physical operator overrides any one of the two methods:
    {PhysicalNode#passThrough(RelTraitSet)} or
    {PhysicalNode#passThroughTraits(RelTraitSet)} depending on your needs.
    
    4. Choose derive mode for each physical operator by overriding
    {PhysicalNode#getDeriveMode()}.
    
    5. If the derive mode is {DeriveMode#OMAKASE}, override method
    {PhysicalNode#derive(List)} in the physical operator, otherwise, override
    {PhysicalNode#derive(RelTraitSet, int)} or
    {PhysicalNode#deriveTraits(RelTraitSet, int)}.
    
    6. Mark your enforcer operator by overriding {RelNode#isEnforcer()}, see
    {Sort#isEnforcer()} as an example. This is important, because it can help
    {VolcanoPlanner} avoid unnecessary trait propagation and derivation, therefore
    improve optimization efficiency.
    
    7. Implement {Convention#enforce(RelNode, RelTraitSet)} in your convention,
    which generates appropriate physical enforcer. See
    {EnumerableConvention#enforce(RelNode, RelTraitSet)} as example. Simply return
    null if you don't want physical trait enforcement.
    
    How does it work?
    
    Let S# denote the seed physical operator in a RelSet after logical and physical
    rules transformation, P# denote the physical operator generated by passing down
    parent trait requirements, D# denote the physical operator generated by
    deriving from child delivered traitSets.
    
    The initial rel list state in a RelSet is as follows:
        cursor
          |
          V
         S1, S2
    
    When we create a task for RelSubset1, the task will immediately pass the
    subset's traitSet to seed operators, S1 and S2, now we have:
        cursor
          |
          V
         S1, S2, P1, P2
    
    The subset task will create a optimization task for the relnode pointed by
    cursor, and move cursor to next available physical operator S2. In the task for
    S1, it will continue optimize its child nodes, which are RelSubsets. After
    child inputs optimization is finished, S1 will derive new relnodes from
    delivered subsets in input RelSet. Once task for S1 is completed, we have:
            cursor
              |
              V
         S1, S2, P1, P2, D1
    
    The subset task continues scheduling task for S2, P1... until there is no more
    relnode created for the RelSet, then we have:
                                    cursor
                                      |
                                      V
         S1, S2, P1, P2, D1, D2, D3, null
    
    When a task for another RelSubset2 is created, the task will try to pass down
    the subset's traitSet to seed operator S1 and S2, now the RelSet looks like:
                                    cursor
                                      |
                                      V
         S1, S2, P1, P2, D1, D2, D3, P3, P4
    
    The process continues till there is no more subsets or relnodes created for the
    RelSet.
    
    See https://lists.apache.org/thread.html/r38ea71968c069f465921e7197488329c15413b46831c90ad4d48f87e%40%3Cdev.calcite.apache.org%3E
    
    Close #1953
---
 .../adapter/enumerable/EnumerableConvention.java   |  20 ++
 .../adapter/enumerable/EnumerableMergeJoin.java    | 106 ++++++-
 .../calcite/adapter/enumerable/EnumerableRel.java  |  23 +-
 .../adapter/enumerable/EnumerableRules.java        |   3 +
 .../enumerable/EnumerableSortedAggregate.java      |  96 ++++++
 .../enumerable/EnumerableSortedAggregateRule.java  |  61 ++++
 .../calcite/config/CalciteSystemProperty.java      |   9 +
 .../java/org/apache/calcite/plan/Convention.java   |  20 ++
 .../java/org/apache/calcite/plan/DeriveMode.java   |  59 ++++
 .../apache/calcite/plan/volcano/OptimizeTask.java  | 344 +++++++++++++++++++++
 .../org/apache/calcite/plan/volcano/RelSet.java    |  78 ++++-
 .../org/apache/calcite/plan/volcano/RelSubset.java |   5 +
 .../calcite/plan/volcano/VolcanoPlanner.java       |  44 +++
 .../java/org/apache/calcite/rel/PhysicalNode.java  | 154 +++++++++
 .../java/org/apache/calcite/rel/RelCollations.java |  11 +
 .../apache/calcite/util/trace/CalciteTrace.java    |   7 +
 .../plan/volcano/CollationConversionTest.java      |   1 +
 .../apache/calcite/plan/volcano/PlannerTests.java  |   5 +
 .../calcite/plan/volcano/TraitConversionTest.java  |   1 +
 .../org/apache/calcite/test/RelOptTestBase.java    |  22 +-
 .../org/apache/calcite/test/TopDownOptTest.java    | 156 ++++++++++
 .../java/org/apache/calcite/tools/PlannerTest.java |  31 +-
 .../org/apache/calcite/test/TopDownOptTest.xml     | 222 +++++++++++++
 core/src/test/resources/saffron.properties         |  17 +
 .../apache/calcite/adapter/tpcds/TpcdsTest.java    |  50 +--
 25 files changed, 1486 insertions(+), 59 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableConvention.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableConvention.java
index c07bb0b..5b583da 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableConvention.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableConvention.java
@@ -22,6 +22,9 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
 
 /**
  * Family of calling conventions that return results as an
@@ -46,6 +49,23 @@ public enum EnumerableConvention implements Convention {
     return "ENUMERABLE";
   }
 
+  @Override public RelNode enforce(
+      final RelNode input,
+      final RelTraitSet required) {
+    RelNode rel = input;
+    if (input.getConvention() != INSTANCE) {
+      rel = ConventionTraitDef.INSTANCE.convert(
+          input.getCluster().getPlanner(),
+          input, INSTANCE, true);
+    }
+    RelCollation collation = required.getTrait(RelCollationTraitDef.INSTANCE);
+    if (collation != null) {
+      assert !collation.getFieldCollations().isEmpty();
+      rel = EnumerableSort.create(rel, collation, null, null);
+    }
+    return rel;
+  }
+
   public RelTraitDef getTraitDef() {
     return ConventionTraitDef.INSTANCE;
   }
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java
index 737867d..4668fbf 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableMergeJoin.java
@@ -22,6 +22,7 @@ import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.plan.DeriveMode;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -40,15 +41,19 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.mapping.Mappings;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /** Implementation of {@link org.apache.calcite.rel.core.Join} in
@@ -64,9 +69,6 @@ public class EnumerableMergeJoin extends Join implements EnumerableRel {
       Set<CorrelationId> variablesSet,
       JoinRelType joinType) {
     super(cluster, traits, ImmutableList.of(), left, right, condition, variablesSet, joinType);
-    final List<RelCollation> collations =
-        traits.getTraits(RelCollationTraitDef.INSTANCE);
-    assert collations == null || RelCollations.contains(collations, joinInfo.leftKeys);
     if (!isMergeJoinSupported(joinType)) {
       throw new UnsupportedOperationException(
           "EnumerableMergeJoin unsupported for join type " + joinType);
@@ -94,6 +96,104 @@ public class EnumerableMergeJoin extends Join implements EnumerableRel {
         CorrelationId.setOf(variablesStopped), joinType);
   }
 
+  @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(
+      final RelTraitSet required) {
+    // Required collation keys can be subset or superset of merge join keys.
+    RelCollation collation = required.getTrait(RelCollationTraitDef.INSTANCE);
+    List<Integer> reqKeys = RelCollations.ordinals(collation);
+    ImmutableBitSet reqKeySet = ImmutableBitSet.of(reqKeys);
+
+    ImmutableBitSet leftKeySet = ImmutableBitSet.of(joinInfo.leftKeys);
+    ImmutableBitSet rightKeySet = ImmutableBitSet.of(joinInfo.rightKeys)
+        .shift(left.getRowType().getFieldCount());
+
+    Map<Integer, Integer> keyMap = new HashMap<>();
+    final int keyCount = leftKeySet.cardinality();
+    for (int i = 0; i < keyCount; i++) {
+      keyMap.put(joinInfo.leftKeys.get(i), joinInfo.rightKeys.get(i));
+    }
+    Mappings.TargetMapping mapping = Mappings.target(keyMap,
+        left.getRowType().getFieldCount(),
+        right.getRowType().getFieldCount());
+
+    // Only consider exact key match for now
+    if (reqKeySet.equals(leftKeySet)) {
+      RelCollation rightCollation = RexUtil.apply(mapping, collation);
+      return Pair.of(
+          required, ImmutableList.of(required,
+          required.replace(rightCollation)));
+    } else if (reqKeySet.equals(rightKeySet)) {
+      RelCollation rightCollation = RelCollations.shift(collation,
+          -left.getRowType().getFieldCount());
+      Mappings.TargetMapping invMapping = mapping.inverse();
+      RelCollation leftCollation = RexUtil.apply(invMapping, rightCollation);
+      return Pair.of(
+          required, ImmutableList.of(
+          required.replace(leftCollation),
+          required.replace(rightCollation)));
+    }
+    // TODO: support subset keys and superset keys
+    return null;
+  }
+
+  @Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(
+      final RelTraitSet childTraits, final int childId) {
+    final int keyCount = joinInfo.leftKeys.size();
+    RelCollation collation = childTraits.getTrait(RelCollationTraitDef.INSTANCE);
+    final int colCount = collation.getFieldCollations().size();
+    if (colCount < keyCount || keyCount == 0) {
+      return null;
+    }
+
+    if (colCount > keyCount) {
+      collation = RelCollations.of(collation.getFieldCollations().subList(0, keyCount));
+    }
+    ImmutableBitSet childCollationKeys = ImmutableBitSet.of(
+        RelCollations.ordinals(collation));
+
+    Map<Integer, Integer> keyMap = new HashMap<>();
+    for (int i = 0; i < keyCount; i++) {
+      keyMap.put(joinInfo.leftKeys.get(i), joinInfo.rightKeys.get(i));
+    }
+
+    Mappings.TargetMapping mapping = Mappings.target(keyMap,
+        left.getRowType().getFieldCount(),
+        right.getRowType().getFieldCount());
+
+    if (childId == 0) {
+      // traits from left child
+      ImmutableBitSet keySet = ImmutableBitSet.of(joinInfo.leftKeys);
+      if (!childCollationKeys.equals(keySet)) {
+        return null;
+      }
+      RelCollation rightCollation = RexUtil.apply(mapping, collation);
+      RelTraitSet joinTraits = getTraitSet().replace(collation);
+
+      // Forget about the equiv keys for the moment
+      return Pair.of(
+          joinTraits, ImmutableList.of(childTraits,
+          right.getTraitSet().replace(rightCollation)));
+    } else {
+      // traits from right child
+      assert childId == 1;
+      ImmutableBitSet keySet = ImmutableBitSet.of(joinInfo.rightKeys);
+      if (!childCollationKeys.equals(keySet)) {
+        return null;
+      }
+      RelCollation leftCollation = RexUtil.apply(mapping.inverse(), collation);
+      RelTraitSet joinTraits = getTraitSet().replace(leftCollation);
+
+      // Forget about the equiv keys for the moment
+      return Pair.of(
+          joinTraits, ImmutableList.of(left.getTraitSet().replace(leftCollation),
+          childTraits.replace(collation)));
+    }
+  }
+
+  @Override public DeriveMode getDeriveMode() {
+    return DeriveMode.BOTH;
+  }
+
   public static EnumerableMergeJoin create(RelNode left, RelNode right,
       RexNode condition, ImmutableIntList leftKeys,
       ImmutableIntList rightKeys, JoinRelType joinType) {
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRel.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRel.java
index c514091..f4a27c0 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRel.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRel.java
@@ -17,7 +17,12 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.linq4j.tree.BlockStatement;
-import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.plan.DeriveMode;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
 
 /**
  * A relational expression of one of the
@@ -25,10 +30,24 @@ import org.apache.calcite.rel.RelNode;
  * conventions.
  */
 public interface EnumerableRel
-    extends RelNode {
+    extends PhysicalNode {
 
   //~ Methods ----------------------------------------------------------------
 
+  @Override default Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(
+      RelTraitSet required) {
+    return null;
+  }
+
+  @Override default Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(
+      RelTraitSet childTraits, int childId) {
+    return null;
+  }
+
+  @Override default DeriveMode getDeriveMode() {
+    return DeriveMode.LEFT_FIRST;
+  }
+
   /**
    * Creates a plan for this expression according to a calling convention.
    *
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
index 07ff770..e7f6b31 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
@@ -113,6 +113,9 @@ public class EnumerableRules {
   public static final EnumerableMatchRule ENUMERABLE_MATCH_RULE =
       new EnumerableMatchRule();
 
+  public static final EnumerableSortedAggregateRule ENUMERABLE_SORTED_AGGREGATE_RULE =
+      new EnumerableSortedAggregateRule();
+
   public static final List<RelOptRule> ENUMERABLE_RULES = ImmutableList.of(
       EnumerableRules.ENUMERABLE_JOIN_RULE,
       EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE,
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
new file mode 100644
index 0000000..b1ce693
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mappings;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Sort based physical implementation of {@link Aggregate} in
+ * {@link EnumerableConvention enumerable calling convention}. */
+public class EnumerableSortedAggregate extends Aggregate implements EnumerableRel {
+  public EnumerableSortedAggregate(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      ImmutableBitSet groupSet,
+      List<ImmutableBitSet> groupSets,
+      List<AggregateCall> aggCalls) {
+    super(cluster, traitSet, ImmutableList.of(), input, groupSet, groupSets, aggCalls);
+    assert getConvention() instanceof EnumerableConvention;
+  }
+
+  @Override public EnumerableSortedAggregate copy(RelTraitSet traitSet, RelNode input,
+      ImmutableBitSet groupSet,
+      List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+    return new EnumerableSortedAggregate(getCluster(), traitSet, input,
+        groupSet, groupSets, aggCalls);
+  }
+
+  @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(
+      final RelTraitSet required) {
+    if (!isSimple(this)) {
+      return null;
+    }
+
+    RelTraitSet inputTraits = getInput().getTraitSet();
+    RelCollation collation = required.getTrait(RelCollationTraitDef.INSTANCE);
+    ImmutableBitSet requiredKeys = ImmutableBitSet.of(RelCollations.ordinals(collation));
+    ImmutableBitSet groupKeys = ImmutableBitSet.range(groupSet.cardinality());
+
+    Mappings.TargetMapping mapping = Mappings.source(groupSet.toList(),
+        input.getRowType().getFieldCount());
+
+    if (requiredKeys.equals(groupKeys)) {
+      RelCollation inputCollation = RexUtil.apply(mapping, collation);
+      return Pair.of(required, ImmutableList.of(inputTraits.replace(inputCollation)));
+    } else if (groupKeys.contains(requiredKeys)) {
+      // group by a,b,c order by c,b
+      List<RelFieldCollation> list = new ArrayList<>(collation.getFieldCollations());
+      groupKeys.except(requiredKeys).forEach(k -> list.add(new RelFieldCollation(k)));
+      RelCollation aggCollation = RelCollations.of(list);
+      RelCollation inputCollation = RexUtil.apply(mapping, aggCollation);
+      return Pair.of(traitSet.replace(aggCollation),
+          ImmutableList.of(inputTraits.replace(inputCollation)));
+    }
+
+    // Group keys doesn't contain all the required keys, e.g.
+    // group by a,b order by a,b,c
+    // nothing we can do to propagate traits to child nodes.
+    return null;
+  }
+
+  public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+    throw Util.needToImplement("EnumerableSortedAggregate");
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregateRule.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregateRule.java
new file mode 100644
index 0000000..2f6e810
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregateRule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.util.ImmutableIntList;
+
+
+/**
+ * Rule to convert a {@link LogicalAggregate}
+ * to an {@link EnumerableSortedAggregate}.
+ */
+class EnumerableSortedAggregateRule extends ConverterRule {
+  EnumerableSortedAggregateRule() {
+    super(LogicalAggregate.class, Convention.NONE,
+        EnumerableConvention.INSTANCE, "EnumerableSortedAggregateRule");
+  }
+
+  public RelNode convert(RelNode rel) {
+    final LogicalAggregate agg = (LogicalAggregate) rel;
+    if (!Aggregate.isSimple(agg)) {
+      return null;
+    }
+    final RelTraitSet inputTraits = rel.getCluster()
+        .traitSet().replace(EnumerableConvention.INSTANCE)
+        .replace(
+            RelCollations.of(
+                ImmutableIntList.copyOf(
+            agg.getGroupSet().asList())));
+    final RelTraitSet selfTraits = inputTraits.replace(
+        RelCollations.of(
+        ImmutableIntList.identity(agg.getGroupSet().cardinality())));
+    return new EnumerableSortedAggregate(
+        rel.getCluster(),
+        selfTraits,
+        convert(agg.getInput(), inputTraits),
+        agg.getGroupSet(),
+        agg.getGroupSets(),
+        agg.getAggCallList());
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
index 813760b..37e36ac 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteSystemProperty.java
@@ -122,6 +122,15 @@ public final class CalciteSystemProperty<T> {
       booleanProperty("calcite.volcano.dump.sets", true);
 
   /**
+   * Whether to enable top-down optimization.
+   *
+   * <p>Note: Enabling top-down optimization will automatically disable
+   * the use of AbstractConverter and related rules.</p>
+   */
+  public static final CalciteSystemProperty<Boolean> TOPDOWN_OPT =
+      booleanProperty("calcite.planner.topdown.opt", false);
+
+  /**
    * Whether to run integration tests.
    */
   // TODO review zabetak:
diff --git a/core/src/main/java/org/apache/calcite/plan/Convention.java b/core/src/main/java/org/apache/calcite/plan/Convention.java
index d8d6795..1d7b5f7 100644
--- a/core/src/main/java/org/apache/calcite/plan/Convention.java
+++ b/core/src/main/java/org/apache/calcite/plan/Convention.java
@@ -38,6 +38,21 @@ public interface Convention extends RelTrait {
   String getName();
 
   /**
+   * Given an input and required traits, returns the corresponding
+   * enforcer rel nodes, like physical Sort, Exchange etc.
+   *
+   * @param input The input RelNode
+   * @param required The required traits
+   * @return Physical enforcer that satisfies the required traitSet,
+   * or {@code null} if trait enforcement is not allowed or the
+   * required traitSet can't be satisfied.
+   */
+  default RelNode enforce(RelNode input, RelTraitSet required) {
+    throw new RuntimeException(getClass().getName()
+        + "#enforce() is not implemented.");
+  }
+
+  /**
    * Returns whether we should convert from this convention to
    * {@code toConvention}. Used by {@link ConventionTraitDef}.
    *
@@ -100,6 +115,11 @@ public interface Convention extends RelTrait {
       return ConventionTraitDef.INSTANCE;
     }
 
+    @Override public RelNode enforce(final RelNode input,
+        final RelTraitSet required) {
+      return null;
+    }
+
     public boolean canConvertConvention(Convention toConvention) {
       return false;
     }
diff --git a/core/src/main/java/org/apache/calcite/plan/DeriveMode.java b/core/src/main/java/org/apache/calcite/plan/DeriveMode.java
new file mode 100644
index 0000000..d6e97eb
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/DeriveMode.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan;
+
+/**
+ * The mode of trait derivation.
+ */
+public enum DeriveMode {
+  /**
+   * Uses the left most child's traits to decide what
+   * traits to require from the other children. This
+   * generally applies to most operators.
+   */
+  LEFT_FIRST,
+
+  /**
+   * Uses the right most child's traits to decide what
+   * traits to require from the other children. Operators
+   * like index nested loop join may find this useful.
+   */
+  RIGHT_FIRST,
+
+  /**
+   * Iterates over each child, uses current child's traits
+   * to decide what traits to require from the other
+   * children. It includes both LEFT_FIRST and RIGHT_FIRST.
+   * System that doesn't enable join commutativity should
+   * consider this option. Special customized operators
+   * like a Join who has 3 inputs may find this useful too.
+   */
+  BOTH,
+
+  /**
+   * Leave it to you, you decide what you cook. This will
+   * allow planner to pass all the traits from all the
+   * children, the user decides how to make use of these
+   * traits and whether to derive new rel nodes.
+   */
+  OMAKASE,
+
+  /**
+   * Trait derivation is prohibited.
+   */
+  PROHIBITED
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java b/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java
new file mode 100644
index 0000000..af651c8
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/OptimizeTask.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.plan.volcano;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.DeriveMode;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.trace.CalciteTrace;
+
+import org.apiguardian.api.API;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * <code>OptimizeTask</code> represents the optimization task
+ * of VolcanoPlanner.
+ *
+ * <p>How does it work?</p>
+ *
+ * <p>Let S# denote the seed physical operator in a RelSet after
+ * logical and physical rules transformation, P# denote the
+ * physical operator generated by passing down parent trait
+ * requirements, D# denote the physical operator generated by
+ * deriving from child delivered traitSets.</p>
+ *
+ * The initial rel list state in a RelSet is as follows:
+ * <pre>
+ *  cursor
+ *    |
+ *    V
+ *   S1, S2
+ * </pre>
+ *
+ * When we create a task for RelSubset1, the task will immediately
+ * pass the subset's traitSet to seed operators, S1 and S2,
+ * now we have:
+ * <pre>
+ *  cursor
+ *    |
+ *    V
+ *   S1, S2, P1, P2
+ * </pre>
+ *
+ * The subset task will create a optimization task for the relnode
+ * pointed by cursor, and move cursor to next available physical
+ * operator S2. In the task for S1, it will continue optimize its
+ * child nodes, which are RelSubsets. After child inputs optimization
+ * is finished, S1 will derive new relnodes from delivered subsets
+ * in input RelSet. Once task for S1 is completed, we have:
+ * <pre>
+ *      cursor
+ *        |
+ *        V
+ *   S1, S2, P1, P2, D1
+ * </pre>
+ *
+ * The subset task continues scheduling task for S2, P1... until
+ * there is no more relnode created for the RelSet, then we have:
+ * <pre>
+ *                              cursor
+ *                                |
+ *                                V
+ *   S1, S2, P1, P2, D1, D2, D3, null
+ * </pre>
+ *
+ * When a task for another RelSubset2 is created, the task will try
+ * to pass down the subset's traitSet to seed operator S1 and S2,
+ * now the RelSet looks like:
+ * <pre>
+ *                              cursor
+ *                                |
+ *                                V
+ *   S1, S2, P1, P2, D1, D2, D3, P3, P4
+ * </pre>
+ *
+ * The process continues till there is no more subsets or relnodes
+ * created for the RelSet.
+ */
+@API(since = "1.23", status = API.Status.INTERNAL)
+abstract class OptimizeTask {
+
+  static final Logger LOGGER = CalciteTrace.getPlannerTaskTracer();
+
+  static OptimizeTask create(RelNode node) {
+    if (node instanceof RelSubset) {
+      return new RelSubsetOptTask((RelSubset) node);
+    }
+    return new RelNodeOptTask(node);
+  }
+
+  final VolcanoPlanner planner;
+  final int id;
+
+  OptimizeTask(RelNode node) {
+    planner = (VolcanoPlanner) node.getCluster().getPlanner();
+    id = planner.nextTaskId++;
+    LOGGER.debug("Scheduled task(id={}) for {}", id, node);
+  }
+
+  abstract boolean hasSubTask();
+
+  abstract OptimizeTask nextSubTask();
+
+  abstract void execute();
+
+  /**
+   * Task State
+   */
+  public enum State {
+    SCHEDULED,
+    EXECUTING,
+    COMPLETED
+  }
+
+  /**
+   * Task for optimizing RelNode.
+   *
+   * <p>Optimize input RelSubsets and derive new RelNodes
+   * from child traitSets.</p>
+   */
+  static class RelNodeOptTask extends OptimizeTask {
+    final RelNode node;
+    int nextId = 0; // next child index
+
+    RelNodeOptTask(RelNode node) {
+      super(node);
+      this.node = node;
+    }
+
+    @Override boolean hasSubTask() {
+      int size = node.getInputs().size();
+      while (nextId < size) {
+        RelSubset subset = (RelSubset) node.getInput(nextId);
+        if (subset.taskState == null) {
+          // not yet scheduled
+          return true;
+        } else {
+          // maybe a cycle if it is not completed
+          nextId++;
+        }
+      }
+
+      return false;
+    }
+
+    @Override OptimizeTask nextSubTask() {
+      RelNode child = node.getInput(nextId++);
+      return new RelSubsetOptTask((RelSubset) child);
+    }
+
+    @Override void execute() {
+      if (!(node instanceof PhysicalNode)
+          || ((PhysicalNode) node).getDeriveMode() == DeriveMode.PROHIBITED
+          || !planner.isSeedNode(node)) {
+        LOGGER.debug("Completed task(id={}) for {}", id, node);
+        return;
+      }
+
+      PhysicalNode rel = (PhysicalNode) node;
+      DeriveMode mode = rel.getDeriveMode();
+      int arity = node.getInputs().size();
+      // for OMAKASE
+      List<List<RelTraitSet>> inputTraits = new ArrayList<>(arity);
+
+      for (int i = 0; i < arity; i++) {
+        int childId = i;
+        if (mode == DeriveMode.RIGHT_FIRST) {
+          childId = arity - i - 1;
+        }
+
+        RelSubset input = (RelSubset) node.getInput(childId);
+        List<RelTraitSet> traits = new ArrayList<>();
+        inputTraits.add(traits);
+
+        final int numSubset = input.set.subsets.size();
+        for (int j = 0; j < numSubset; j++) {
+          RelSubset subset = input.set.subsets.get(j);
+          if (!subset.isDelivered() || equalsSansConvention(
+              subset.getTraitSet(), rel.getCluster().traitSet())) {
+            // TODO: should use matching type to determine
+            // Ideally we should stop deriving new relnodes when the
+            // subset's traitSet equals with input traitSet, but
+            // in case someone manually builds a physical relnode
+            // tree, which is highly discouraged, without specifying
+            // correct traitSet, e.g.
+            //   EnumerableFilter  [].ANY
+            //       -> EnumerableMergeJoin  [a].Hash[a]
+            // We should still be able to derive the correct traitSet
+            // for the dumb filter, even though the filter's traitSet
+            // should be derived from the MergeJoin when it is created.
+            // But if the subset's traitSet equals with the default
+            // empty traitSet sans convention (the default traitSet
+            // from cluster may have logical convention, NONE, which
+            // is not interesting), we are safe to ignore it, because
+            // a physical filter with non default traitSet, but has a
+            // input with default empty traitSet, e.g.
+            //   EnumerableFilter  [a].Hash[a]
+            //       -> EnumerableProject  [].ANY
+            // is definitely wrong, we should fail fast.
+            continue;
+          }
+
+          if (mode == DeriveMode.OMAKASE) {
+            traits.add(subset.getTraitSet());
+          } else {
+            RelNode newRel = rel.derive(subset.getTraitSet(), childId);
+            if (newRel != null && !planner.isRegistered(newRel)) {
+              RelSubset relSubset = planner.register(newRel, node);
+              assert relSubset.set == planner.getSubset(node).set;
+            }
+          }
+        }
+
+        if (mode == DeriveMode.LEFT_FIRST
+            || mode == DeriveMode.RIGHT_FIRST) {
+          break;
+        }
+      }
+
+      if (mode == DeriveMode.OMAKASE) {
+        List<RelNode> relList = rel.derive(inputTraits);
+        for (RelNode relNode : relList) {
+          if (!planner.isRegistered(relNode)) {
+            planner.register(relNode, node);
+          }
+        }
+      }
+
+      LOGGER.debug("Completed task(id={}) for {}", id, node);
+    }
+
+    /**
+     * Returns whether the 2 traitSets are equal without Convention.
+     * It assumes they have the same traitDefs order.
+     */
+    private boolean equalsSansConvention(RelTraitSet ts1, RelTraitSet ts2) {
+      assert ts1.size() == ts2.size();
+      for (int i = 0; i < ts1.size(); i++) {
+        RelTrait trait = ts1.getTrait(i);
+        if (trait.getTraitDef() == ConventionTraitDef.INSTANCE) {
+          continue;
+        }
+        if (!trait.equals(ts2.getTrait(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override public String toString() {
+      return "Task#" + id + ":{ " + node + " }";
+    }
+  }
+
+  /**
+   * Task for optimizing RelSubset.
+   *
+   * <p>Pass down the trait requirements of current RelSubset
+   * and add enforcers to the new delivered subsets.</p>
+   */
+  static class RelSubsetOptTask extends OptimizeTask {
+    final RelSubset subset;
+    final Set<RelSubset> deliveredSubsets = new HashSet<>();
+
+    RelSubsetOptTask(RelSubset subset) {
+      super(subset);
+      this.subset = subset;
+      subset.taskState = State.SCHEDULED;
+      propagateTraits();
+    }
+
+    private void propagateTraits() {
+      int size = subset.set.getSeedSize();
+
+      for (int i = 0; i < size; i++) {
+        RelNode rel = subset.set.rels.get(i);
+        if (!(rel instanceof PhysicalNode)
+            || rel.getConvention() == Convention.NONE
+            || rel.getTraitSet().satisfies(subset.getTraitSet())) {
+          continue;
+        }
+
+        RelNode node = ((PhysicalNode) rel).passThrough(
+            subset.getTraitSet());
+        if (node != null && !planner.isRegistered(node)) {
+          RelSubset newSubset = planner.register(node, subset);
+          deliveredSubsets.add(newSubset);
+        } else {
+          // TODO: should we consider stop trying propagation on node
+          // with the same traitset as phyNode?
+          assert true;
+        }
+      }
+    }
+
+    @Override boolean hasSubTask() {
+      return subset.set.hasNextPhysicalNode();
+    }
+
+    @Override OptimizeTask nextSubTask() {
+      RelNode rel = subset.set.nextPhysicalNode();
+      return new RelNodeOptTask(rel);
+    }
+
+    @Override void execute() {
+      subset.taskState = State.EXECUTING;
+
+      subset.set.addConverters(subset, true, false);
+
+      for (RelSubset delivered : deliveredSubsets) {
+        subset.set.addConverters(delivered, false, false);
+      }
+
+      subset.taskState = State.COMPLETED;
+      LOGGER.debug("Completed task(id={}) for {}", id, subset);
+    }
+
+    @Override public String toString() {
+      return "Task#" + id + ":{ " + subset + " }";
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
index 22cb936..04fa187 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
@@ -23,11 +23,13 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.Converter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Spool;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.trace.CalciteTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -74,6 +76,23 @@ class RelSet {
   RelNode rel;
 
   /**
+   * The position indicator of rel node that is to be processed.
+   */
+  private int relCursor = 0;
+
+  /**
+   * The relnodes after applying logical rules and physical rules,
+   * before trait propagation and enforcement.
+   */
+  final Set<RelNode> seeds = new HashSet<>();
+
+  /**
+   * Records conversions / enforcements that have happened on the
+   * pair of derived and required traitset.
+   */
+  final Set<Pair<RelTraitSet, RelTraitSet>> conversions = new HashSet<>();
+
+  /**
    * Variables that are set by relational expressions in this set
    * and available for use by parent and child expressions.
    */
@@ -147,6 +166,32 @@ class RelSet {
     return null;
   }
 
+  public int getSeedSize() {
+    if (seeds.isEmpty()) {
+      seeds.addAll(rels);
+    }
+    return seeds.size();
+  }
+
+  public boolean hasNextPhysicalNode() {
+    while (relCursor < rels.size()) {
+      RelNode node = rels.get(relCursor);
+      if (node instanceof PhysicalNode
+          && node.getConvention() != Convention.NONE) {
+        // enforcer may be manually created for some reason
+        if (relCursor < getSeedSize() || !node.isEnforcer()) {
+          return true;
+        }
+      }
+      relCursor++;
+    }
+    return false;
+  }
+
+  public RelNode nextPhysicalNode() {
+    return rels.get(relCursor++);
+  }
+
   /**
    * Removes all references to a specific {@link RelNode} in both the subsets
    * and their parent relationships.
@@ -174,8 +219,9 @@ class RelSet {
    * Otherwise, convert this subset to required subsets in this RelSet.
    * The subset can be both required and delivered.
    */
-  private void addAbstractConverters(
-      RelOptCluster cluster, RelSubset subset, boolean required) {
+  void addConverters(RelSubset subset, boolean required,
+      boolean useAbstractConverter) {
+    RelOptCluster cluster = subset.getCluster();
     List<RelSubset> others = subsets.stream().filter(
         n -> required ? n.isDelivered() : n.isRequired())
         .collect(Collectors.toList());
@@ -190,12 +236,16 @@ class RelSet {
         to = subset;
       }
 
-      if (from == to || !from.getConvention()
-          .useAbstractConvertersForConversion(
+      if (from == to || useAbstractConverter
+          && !from.getConvention().useAbstractConvertersForConversion(
               from.getTraitSet(), to.getTraitSet())) {
         continue;
       }
 
+      if (!conversions.add(Pair.of(from.getTraitSet(), to.getTraitSet()))) {
+        continue;
+      }
+
       final ImmutableList<RelTrait> difference =
           to.getTraitSet().difference(from.getTraitSet());
 
@@ -217,9 +267,17 @@ class RelSet {
       }
 
       if (needsConverter) {
-        final AbstractConverter converter =
-            new AbstractConverter(cluster, from, null, to.getTraitSet());
-        cluster.getPlanner().register(converter, to);
+        final RelNode enforcer;
+        if (useAbstractConverter) {
+          enforcer = new AbstractConverter(
+              cluster, from, null, to.getTraitSet());
+        } else {
+          enforcer = subset.getConvention().enforce(from, to.getTraitSet());
+        }
+
+        if (enforcer != null) {
+          cluster.getPlanner().register(enforcer, to);
+        }
       }
     }
   }
@@ -231,6 +289,7 @@ class RelSet {
   RelSubset getOrCreateSubset(
       RelOptCluster cluster, RelTraitSet traits, boolean required) {
     boolean needsConverter = false;
+    final VolcanoPlanner planner = (VolcanoPlanner) cluster.getPlanner();
     RelSubset subset = getSubset(traits);
 
     if (subset == null) {
@@ -242,7 +301,6 @@ class RelSet {
       // register() the planner will try to add this subset again.
       subsets.add(subset);
 
-      final VolcanoPlanner planner = (VolcanoPlanner) cluster.getPlanner();
       if (planner.getListener() != null) {
         postEquivalenceEvent(planner, subset);
       }
@@ -259,8 +317,8 @@ class RelSet {
       subset.setDelivered();
     }
 
-    if (needsConverter) {
-      addAbstractConverters(cluster, subset, required);
+    if (needsConverter && !planner.topDownOpt) {
+      addConverters(subset, required, true);
     }
 
     return subset;
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java b/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
index b94f64b..ad37268 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java
@@ -82,6 +82,11 @@ public class RelSubset extends AbstractRelNode {
   //~ Instance fields --------------------------------------------------------
 
   /**
+   * Optimization task state
+   */
+  OptimizeTask.State taskState;
+
+  /**
    * cost of best known plan (it may have improved since)
    */
   RelOptCost bestCost;
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
index 89a5b4e..20a1ef0 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java
@@ -180,6 +180,21 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
    * {@link org.apache.calcite.plan.volcano.VolcanoCost}. */
   private final RelOptCost zeroCost;
 
+  /**
+   * Optimization tasks including trait propagation, enforcement.
+   */
+  final Deque<OptimizeTask> tasks = new ArrayDeque<>();
+
+  /**
+   * The id generator for optimization tasks.
+   */
+  int nextTaskId = 0;
+
+  /**
+   * Whether to enable top-down optimization or not.
+   */
+  boolean topDownOpt = CalciteSystemProperty.TOPDOWN_OPT.value();
+
   //~ Constructors -----------------------------------------------------------
 
   /**
@@ -225,6 +240,16 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     };
   }
 
+  /**
+   * Enable or disable top-down optimization.
+   *
+   * <p>Note: Enabling top-down optimization will automatically disable
+   * the use of AbstractConverter and related rules.</p>
+   */
+  public void setTopDownOpt(boolean value) {
+    topDownOpt = value;
+  }
+
   // implement RelOptPlanner
   public boolean isRegistered(RelNode rel) {
     return mapRel2Subset.get(rel) != null;
@@ -505,6 +530,20 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
 
       ruleQueue.phaseCompleted(phase);
     }
+
+    if (topDownOpt) {
+      tasks.push(OptimizeTask.create(root));
+      while (!tasks.isEmpty()) {
+        OptimizeTask task = tasks.peek();
+        if (task.hasSubTask()) {
+          tasks.push(task.nextSubTask());
+          continue;
+        }
+        task = tasks.pop();
+        task.execute();
+      }
+    }
+
     if (LOGGER.isTraceEnabled()) {
       StringWriter sw = new StringWriter();
       final PrintWriter pw = new PrintWriter(sw);
@@ -746,6 +785,11 @@ public class VolcanoPlanner extends AbstractRelOptPlanner {
     return set.getSubset(traits);
   }
 
+  boolean isSeedNode(RelNode node) {
+    final RelSet set = getSubset(node).set;
+    return set.seeds.contains(node);
+  }
+
   RelNode changeTraitsUsingConverters(
       RelNode rel,
       RelTraitSet toTraits) {
diff --git a/core/src/main/java/org/apache/calcite/rel/PhysicalNode.java b/core/src/main/java/org/apache/calcite/rel/PhysicalNode.java
new file mode 100644
index 0000000..a2839a4
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/PhysicalNode.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.DeriveMode;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Physical node in a planner that is capable of doing
+ * physical trait propagation and derivation.
+ *
+ * <p>How to use?</p>
+ *
+ * <ol>
+ *   <li>Enable top-down optimization by setting
+ *   {@link org.apache.calcite.plan.volcano.VolcanoPlanner#setTopDownOpt(boolean)}.
+ *   </li>
+ *
+ *   <li>Let your convention's rel interface extends {@link PhysicalNode},
+ *   see {@link org.apache.calcite.adapter.enumerable.EnumerableRel} as
+ *   an example.</li>
+ *
+ *   <li>Each physical operator overrides any one of the two methods:
+ *   {@link PhysicalNode#passThrough(RelTraitSet)} or
+ *   {@link PhysicalNode#passThroughTraits(RelTraitSet)} depending on
+ *   your needs.</li>
+ *
+ *   <li>Choose derive mode for each physical operator by overriding
+ *   {@link PhysicalNode#getDeriveMode()}.</li>
+ *
+ *   <li>If the derive mode is {@link DeriveMode#OMAKASE}, override
+ *   method {@link PhysicalNode#derive(List)} in the physical operator,
+ *   otherwise, override {@link PhysicalNode#derive(RelTraitSet, int)}
+ *   or {@link PhysicalNode#deriveTraits(RelTraitSet, int)}.</li>
+ *
+ *   <li>Mark your enforcer operator by overriding {@link RelNode#isEnforcer()},
+ *   see {@link Sort#isEnforcer()} as an example. This is important,
+ *   because it can help {@code VolcanoPlanner} avoid unnecessary
+ *   trait propagation and derivation, therefore improve optimization
+ *   efficiency.</li>
+ *
+ *   <li>Implement {@link Convention#enforce(RelNode, RelTraitSet)}
+ *   in your convention, which generates appropriate physical enforcer.
+ *   See {@link org.apache.calcite.adapter.enumerable.EnumerableConvention}
+ *   as example. Simply return {@code null} if you don't want physical
+ *   trait enforcement.</li>
+ * </ol>
+ */
+public interface PhysicalNode extends RelNode {
+
+  /**
+   * Pass required traitset from parent node to child nodes,
+   * returns new node after traits is passed down.
+   */
+  default RelNode passThrough(RelTraitSet required) {
+    Pair<RelTraitSet, List<RelTraitSet>> p = passThroughTraits(required);
+    if (p == null) {
+      return null;
+    }
+    int size = getInputs().size();
+    assert size == p.right.size();
+    List<RelNode> list = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      RelNode n = RelOptRule.convert(getInput(i), p.right.get(i));
+      list.add(n);
+    }
+    return copy(p.left, list);
+  }
+
+  /**
+   * Pass required traitset from parent node to child nodes,
+   * returns a pair of traits after traits is passed down.
+   *
+   * <p>Pair.left: the new traitset
+   * <p>Pair.right: the list of required traitsets for child nodes
+   */
+  default Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(
+      RelTraitSet required) {
+    throw new RuntimeException(getClass().getName()
+        + "#passThroughTraits() is not implemented.");
+  }
+
+  /**
+   * Derive traitset from child node, returns new node after
+   * traits derivation.
+   */
+  default RelNode derive(RelTraitSet childTraits, int childId) {
+    Pair<RelTraitSet, List<RelTraitSet>> p = deriveTraits(childTraits, childId);
+    if (p == null) {
+      return null;
+    }
+    int size = getInputs().size();
+    assert size == p.right.size();
+    List<RelNode> list = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      RelNode node = getInput(i);
+      node = RelOptRule.convert(node, p.right.get(i));
+      list.add(node);
+    }
+    return copy(p.left, list);
+  }
+
+  /**
+   * Derive traitset from child node, returns a pair of traits after
+   * traits derivation.
+   *
+   * <p>Pair.left: the new traitset
+   * <p>Pair.right: the list of required traitsets for child nodes
+   */
+  default Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(
+      RelTraitSet childTraits, int childId) {
+    throw new RuntimeException(getClass().getName()
+        + "#deriveTraits() is not implemented.");
+  }
+
+  /**
+   * Given a list of child traitsets,
+   * inputTraits.size() == getInput().size(),
+   * returns node list after traits derivation. This method is called
+   * ONLY when the derive mode is OMAKASE.
+   */
+  default List<RelNode> derive(List<List<RelTraitSet>> inputTraits) {
+    throw new RuntimeException(getClass().getName()
+        + "#derive() is not implemented.");
+  }
+
+  /**
+   * Returns mode of derivation.
+   */
+  default DeriveMode getDeriveMode() {
+    return DeriveMode.LEFT_FIRST;
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/rel/RelCollations.java b/core/src/main/java/org/apache/calcite/rel/RelCollations.java
index bf1820f..8f1bfb1 100644
--- a/core/src/main/java/org/apache/calcite/rel/RelCollations.java
+++ b/core/src/main/java/org/apache/calcite/rel/RelCollations.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Utilities concerning {@link org.apache.calcite.rel.RelCollation}
@@ -90,6 +91,16 @@ public class RelCollations {
   }
 
   /**
+   * Creates a collation containing multiple fields.
+   */
+  public static RelCollation of(ImmutableIntList keys) {
+    List<RelFieldCollation> cols = keys.stream()
+        .map(k -> new RelFieldCollation(k))
+        .collect(Collectors.toList());
+    return of(cols);
+  }
+
+  /**
    * Creates a list containing one collation containing one field.
    */
   public static List<RelCollation> createSingleton(int fieldIndex) {
diff --git a/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java b/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
index fc21ab7..bf94a6f 100644
--- a/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
+++ b/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
@@ -80,6 +80,13 @@ public abstract class CalciteTrace {
   }
 
   /**
+   * Reports volcano planner optimization task events.
+   */
+  public static Logger getPlannerTaskTracer() {
+    return LoggerFactory.getLogger("org.apache.calcite.plan.volcano.task");
+  }
+
+  /**
    * The "org.apache.calcite.prepare.Prepare" tracer prints the generated
    * program at DEBUG (formerly, FINE)  or higher.
    */
diff --git a/core/src/test/java/org/apache/calcite/plan/volcano/CollationConversionTest.java b/core/src/test/java/org/apache/calcite/plan/volcano/CollationConversionTest.java
index 932d59b..415457e 100644
--- a/core/src/test/java/org/apache/calcite/plan/volcano/CollationConversionTest.java
+++ b/core/src/test/java/org/apache/calcite/plan/volcano/CollationConversionTest.java
@@ -70,6 +70,7 @@ class CollationConversionTest {
     planner.addRule(new SingleNodeRule());
     planner.addRule(new LeafTraitRule());
     planner.addRule(ExpandConversionRule.INSTANCE);
+    planner.setTopDownOpt(false);
 
     final RelOptCluster cluster = newCluster(planner);
     final NoneLeafRel leafRel = new NoneLeafRel(cluster, "a");
diff --git a/core/src/test/java/org/apache/calcite/plan/volcano/PlannerTests.java b/core/src/test/java/org/apache/calcite/plan/volcano/PlannerTests.java
index e52e079..008af69 100644
--- a/core/src/test/java/org/apache/calcite/plan/volcano/PlannerTests.java
+++ b/core/src/test/java/org/apache/calcite/plan/volcano/PlannerTests.java
@@ -57,6 +57,11 @@ class PlannerTests {
             RelTraitSet fromTraits, RelTraitSet toTraits) {
           return true;
         }
+
+        @Override public RelNode enforce(final RelNode input,
+            final RelTraitSet required) {
+          return null;
+        }
       };
 
   static final Convention PHYS_CALLING_CONVENTION_2 =
diff --git a/core/src/test/java/org/apache/calcite/plan/volcano/TraitConversionTest.java b/core/src/test/java/org/apache/calcite/plan/volcano/TraitConversionTest.java
index e0d65c8..ab4151f 100644
--- a/core/src/test/java/org/apache/calcite/plan/volcano/TraitConversionTest.java
+++ b/core/src/test/java/org/apache/calcite/plan/volcano/TraitConversionTest.java
@@ -63,6 +63,7 @@ class TraitConversionTest {
     planner.addRule(new RandomSingleTraitRule());
     planner.addRule(new SingleLeafTraitRule());
     planner.addRule(ExpandConversionRule.INSTANCE);
+    planner.setTopDownOpt(false);
 
     final RelOptCluster cluster = newCluster(planner);
     final NoneLeafRel leafRel = new NoneLeafRel(cluster, "a");
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptTestBase.java b/core/src/test/java/org/apache/calcite/test/RelOptTestBase.java
index 5d610fa..6f3ee5c 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptTestBase.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptTestBase.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.test;
 
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -24,6 +25,7 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.core.RelFactories;
@@ -108,6 +110,10 @@ abstract class RelOptTestBase extends SqlToRelTestBase {
     diffRepos.assertEquals("planBefore", "${planBefore}", planBefore);
     SqlToRelTestBase.assertValid(relBefore);
 
+    if (planner instanceof VolcanoPlanner) {
+      relBefore = planner.changeTraits(relBefore,
+          relBefore.getTraitSet().replace(EnumerableConvention.INSTANCE));
+    }
     planner.setRoot(relBefore);
     RelNode r = planner.findBestExp();
     if (tester.isLateDecorrelate()) {
@@ -142,27 +148,27 @@ abstract class RelOptTestBase extends SqlToRelTestBase {
     private final Tester tester;
     private final String sql;
     private HepProgram preProgram;
-    private final HepPlanner hepPlanner;
+    private final RelOptPlanner planner;
     private final ImmutableMap<Hook, Consumer> hooks;
     private ImmutableList<Function<Tester, Tester>> transforms;
 
-    Sql(Tester tester, String sql, HepProgram preProgram, HepPlanner hepPlanner,
+    Sql(Tester tester, String sql, HepProgram preProgram, RelOptPlanner planner,
         ImmutableMap<Hook, Consumer> hooks,
         ImmutableList<Function<Tester, Tester>> transforms) {
       this.tester = tester;
       this.sql = sql;
       this.preProgram = preProgram;
-      this.hepPlanner = hepPlanner;
+      this.planner = planner;
       this.hooks = hooks;
       this.transforms = transforms;
     }
 
     public Sql withTester(UnaryOperator<Tester> transform) {
-      return new Sql(transform.apply(tester), sql, preProgram, hepPlanner, hooks, transforms);
+      return new Sql(transform.apply(tester), sql, preProgram, planner, hooks, transforms);
     }
 
     public Sql withPre(HepProgram preProgram) {
-      return new Sql(tester, sql, preProgram, hepPlanner, hooks, transforms);
+      return new Sql(tester, sql, preProgram, planner, hooks, transforms);
     }
 
     public Sql with(HepPlanner hepPlanner) {
@@ -185,7 +191,7 @@ abstract class RelOptTestBase extends SqlToRelTestBase {
     /** Adds a transform that will be applied to {@link #tester}
      * just before running the query. */
     private Sql withTransform(Function<Tester, Tester> transform) {
-      return new Sql(tester, sql, preProgram, hepPlanner, hooks,
+      return new Sql(tester, sql, preProgram, planner, hooks,
           FlatLists.append(transforms, transform));
     }
 
@@ -193,7 +199,7 @@ abstract class RelOptTestBase extends SqlToRelTestBase {
      * hook (by calling {@link Hook#addThread(Consumer)})
      * just before running the query, and remove the hook afterwards. */
     public <T> Sql withHook(Hook hook, Consumer<T> handler) {
-      return new Sql(tester, sql, preProgram, hepPlanner,
+      return new Sql(tester, sql, preProgram, planner,
           FlatLists.append(hooks, hook, handler), transforms);
     }
 
@@ -256,7 +262,7 @@ abstract class RelOptTestBase extends SqlToRelTestBase {
         for (Function<Tester, Tester> transform : transforms) {
           t = transform.apply(t);
         }
-        checkPlanning(t, preProgram, hepPlanner, sql, unchanged);
+        checkPlanning(t, preProgram, planner, sql, unchanged);
       }
     }
   }
diff --git a/core/src/test/java/org/apache/calcite/test/TopDownOptTest.java b/core/src/test/java/org/apache/calcite/test/TopDownOptTest.java
new file mode 100644
index 0000000..d188c7b
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/test/TopDownOptTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.rules.JoinCommuteRule;
+import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit test for top-down optimization.
+ *
+ * <p>As input, the test supplies a SQL statement and rules; the SQL is
+ * translated into relational algebra and then fed into a
+ * {@link VolcanoPlanner}. The plan before and after "optimization" is
+ * diffed against a .ref file using {@link DiffRepository}.
+ *
+ * <p>Procedure for adding a new test case:
+ *
+ * <ol>
+ * <li>Add a new public test method for your rule, following the existing
+ * examples. You'll have to come up with an SQL statement to which your rule
+ * will apply in a meaningful way. See
+ * {@link org.apache.calcite.test.catalog.MockCatalogReaderSimple} class
+ * for details on the schema.
+ *
+ * <li>Run the test. It should fail. Inspect the output in
+ * {@code target/surefire/.../TopDownOptTest.xml}.
+ *
+ * <li>Verify that the "planBefore" is the correct
+ * translation of your SQL, and that it contains the pattern on which your rule
+ * is supposed to fire. If all is well, replace
+ * {@code src/test/resources/.../TopDownOptTest.xml} and
+ * with the new {@code target/surefire/.../TopDownOptTest.xml}.
+ *
+ * <li>Run the test again. It should fail again, but this time it should contain
+ * a "planAfter" entry for your rule. Verify that your rule applied its
+ * transformation correctly, and then update the
+ * {@code src/test/resources/.../TopDownOptTest.xml} file again.
+ *
+ * <li>Run the test one last time; this time it should pass.
+ * </ol>
+ */
+class TopDownOptTest extends RelOptTestBase {
+
+  protected DiffRepository getDiffRepos() {
+    return DiffRepository.lookup(TopDownOptTest.class);
+  }
+
+  Sql sql(String sql) {
+    VolcanoPlanner planner = new VolcanoPlanner();
+    // Always use top-down optimization
+    planner.setTopDownOpt(true);
+    planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
+    planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
+
+    RelOptUtil.registerDefaultRules(planner, false, false);
+
+    // Keep deterministic join order
+    planner.removeRule(JoinCommuteRule.INSTANCE);
+    planner.removeRule(JoinPushThroughJoinRule.LEFT);
+    planner.removeRule(JoinPushThroughJoinRule.RIGHT);
+
+    // Always use merge join
+    planner.removeRule(EnumerableRules.ENUMERABLE_JOIN_RULE);
+    planner.removeRule(EnumerableRules.ENUMERABLE_CALC_RULE);
+
+    // Always use sorted agg
+    planner.removeRule(EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
+    planner.addRule(EnumerableRules.ENUMERABLE_SORTED_AGGREGATE_RULE);
+
+    Tester tester = createTester().withDecorrelation(true)
+        .withClusterFactory(cluster -> RelOptCluster.create(planner, cluster.getRexBuilder()));
+
+    return new Sql(tester, sql, null, planner,
+        ImmutableMap.of(), ImmutableList.of());
+  }
+
+  @Test void testSortAgg() {
+    final String sql = "select mgr, count(*) from sales.emp\n"
+        + "group by mgr order by mgr desc nulls last limit 5";
+    sql(sql).check();
+  }
+
+  @Test void testSortAggPartialKey() {
+    final String sql = "select mgr,deptno,comm,count(*) from sales.emp\n"
+        + "group by mgr,deptno,comm\n"
+        + "order by comm desc nulls last, deptno nulls first";
+    sql(sql).check();
+  }
+
+  @Test void testSortMergeJoin() {
+    final String sql = "select * from\n"
+        + "sales.emp r join sales.bonus s on r.ename=s.ename and r.job=s.job\n"
+        + "order by r.job desc nulls last, r.ename nulls first";
+    sql(sql).check();
+  }
+
+  @Test void testSortMergeJoinRight() {
+    final String sql = "select * from\n"
+        + "sales.emp r join sales.bonus s on r.ename=s.ename and r.job=s.job\n"
+        + "order by s.job desc nulls last, s.ename nulls first";
+    sql(sql).check();
+  }
+
+  @Test void testMergeJoinDeriveLeft1() {
+    final String sql = "select * from\n"
+        + "(select ename, job, max(sal) from sales.emp group by ename, job) r\n"
+        + "join sales.bonus s on r.job=s.job and r.ename=s.ename";
+    sql(sql).check();
+  }
+
+  @Test void testMergeJoinDeriveLeft2() {
+    final String sql = "select * from\n"
+        + "(select ename, job, mgr, max(sal) from sales.emp group by ename, job, mgr) r\n"
+        + "join sales.bonus s on r.job=s.job and r.ename=s.ename";
+    sql(sql).check();
+  }
+
+  @Test void testMergeJoinDeriveRight1() {
+    final String sql = "select * from sales.bonus s join\n"
+        + "(select ename, job, max(sal) from sales.emp group by ename, job) r\n"
+        + "on r.job=s.job and r.ename=s.ename";
+    sql(sql).check();
+  }
+
+  @Test void testMergeJoinDeriveRight2() {
+    final String sql = "select * from sales.bonus s join\n"
+        + "(select ename, job, mgr, max(sal) from sales.emp group by ename, job, mgr) r\n"
+        + "on r.job=s.job and r.ename=s.ename";
+    sql(sql).check();
+  }
+}
diff --git a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
index e6ce8a0..ed71050 100644
--- a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
@@ -969,7 +969,7 @@ public class PlannerTest {
     RelNode transform = planner.transform(0, traitSet, convert);
     assertThat(toString(transform),
         containsString(
-            "EnumerableHashJoin(condition=[=($0, $5)], joinType=[inner])"));
+            "EnumerableMergeJoin(condition=[=($0, $5)], joinType=[inner])"));
   }
 
   /** Test case for
@@ -1030,10 +1030,12 @@ public class PlannerTest {
         + "EnumerableProject(empid=[$2], deptno=[$3], name=[$4], salary=[$5], commission=[$6], deptno0=[$7], name0=[$8], employees=[$9], location=[ROW($10, $11)], empid0=[$0], name1=[$1])\n"
         + "  EnumerableHashJoin(condition=[=($0, $2)], joinType=[left])\n"
         + "    EnumerableTableScan(table=[[hr, dependents]])\n"
-        + "    EnumerableHashJoin(condition=[=($1, $5)], joinType=[inner])\n"
-        + "      EnumerableTableScan(table=[[hr, emps]])\n"
-        + "      EnumerableProject(deptno=[$0], name=[$1], employees=[$2], x=[$3.x], y=[$3.y])\n"
-        + "        EnumerableTableScan(table=[[hr, depts]])";
+        + "    EnumerableMergeJoin(condition=[=($1, $5)], joinType=[inner])\n"
+        + "      EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+        + "        EnumerableTableScan(table=[[hr, emps]])\n"
+        + "      EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+        + "        EnumerableProject(deptno=[$0], name=[$1], employees=[$2], x=[$3.x], y=[$3.y])\n"
+        + "          EnumerableTableScan(table=[[hr, depts]])";
     checkHeuristic(sql, expected);
   }
 
@@ -1098,10 +1100,11 @@ public class PlannerTest {
         + "      EnumerableFilter(condition=[=($9, 'San Francisco')])\n"
         + "        EnumerableTableScan(table=[[foodmart2, customer]])\n"
         + "      EnumerableHashJoin(condition=[=($6, $20)], joinType=[inner])\n"
-        + "        EnumerableHashJoin(condition=[=($0, $5)], joinType=[inner])\n"
+        + "        EnumerableMergeJoin(condition=[=($0, $5)], joinType=[inner])\n"
         + "          EnumerableTableScan(table=[[foodmart2, product_class]])\n"
-        + "          EnumerableFilter(condition=[=($2, 'Washington')])\n"
-        + "            EnumerableTableScan(table=[[foodmart2, product]])\n"
+        + "          EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+        + "            EnumerableFilter(condition=[=($2, 'Washington')])\n"
+        + "              EnumerableTableScan(table=[[foodmart2, product]])\n"
         + "        EnumerableTableScan(table=[[foodmart2, sales_fact_1997]])\n";
     checkBushy(sql, expected);
   }
@@ -1128,11 +1131,13 @@ public class PlannerTest {
         + "      EnumerableHashJoin(condition=[=($0, $51)], joinType=[inner])\n"
         + "        EnumerableFilter(condition=[=($9, 'San Francisco')])\n"
         + "          EnumerableTableScan(table=[[foodmart2, customer]])\n"
-        + "        EnumerableHashJoin(condition=[=($6, $20)], joinType=[inner])\n"
-        + "          EnumerableHashJoin(condition=[=($0, $5)], joinType=[inner])\n"
-        + "            EnumerableTableScan(table=[[foodmart2, product_class]])\n"
-        + "            EnumerableTableScan(table=[[foodmart2, product]])\n"
-        + "          EnumerableTableScan(table=[[foodmart2, sales_fact_1997]])\n";
+        + "        EnumerableMergeJoin(condition=[=($6, $20)], joinType=[inner])\n"
+        + "          EnumerableSort(sort0=[$6], dir0=[ASC])\n"
+        + "            EnumerableHashJoin(condition=[=($0, $5)], joinType=[inner])\n"
+        + "              EnumerableTableScan(table=[[foodmart2, product_class]])\n"
+        + "              EnumerableTableScan(table=[[foodmart2, product]])\n"
+        + "          EnumerableSort(sort0=[$0], dir0=[ASC])\n"
+        + "            EnumerableTableScan(table=[[foodmart2, sales_fact_1997]])\n";
     checkBushy(sql, expected);
   }
 
diff --git a/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml b/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
new file mode 100644
index 0000000..2a38a02
--- /dev/null
+++ b/core/src/test/resources/org/apache/calcite/test/TopDownOptTest.xml
@@ -0,0 +1,222 @@
+<?xml version="1.0" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to you under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<Root>
+    <TestCase name="testSortAgg">
+        <Resource name="sql">
+            <![CDATA[select mgr, count(*) from sales.emp
+group by mgr order by mgr desc nulls last limit 5]]>
+        </Resource>
+        <Resource name="planBefore">
+            <![CDATA[
+LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[5])
+  LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
+    LogicalProject(MGR=[$3])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+        </Resource>
+        <Resource name="planAfter">
+            <![CDATA[
+EnumerableLimit(fetch=[5])
+  EnumerableSortedAggregate(group=[{3}], EXPR$1=[COUNT()])
+    EnumerableSort(sort0=[$3], dir0=[DESC-nulls-last])
+      EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+        </Resource>
+    </TestCase>
+  <TestCase name="testSortAggPartialKey">
+    <Resource name="sql">
+      <![CDATA[select mgr,deptno,comm,count(*) from sales.emp
+        group by mgr,deptno,comm
+        order by comm desc nulls last, deptno nulls first]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$2], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])
+  LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
+    LogicalProject(MGR=[$3], DEPTNO=[$7], COMM=[$6])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+EnumerableProject(MGR=[$0], DEPTNO=[$2], COMM=[$1], EXPR$3=[$3])
+  EnumerableSortedAggregate(group=[{3, 6, 7}], EXPR$3=[COUNT()])
+    EnumerableSort(sort0=[$6], sort1=[$7], sort2=[$3], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first], dir2=[ASC])
+      EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSortMergeJoin">
+    <Resource name="sql">
+      <![CDATA[select * from
+        sales.emp r join sales.bonus s on r.ename=s.ename and r.job=s.job
+        order by r.job desc nulls last, r.ename nulls first]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$2], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])
+  LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], ENAME0=[$9], JOB0=[$10], SAL0=[$11], COMM0=[$12])
+    LogicalJoin(condition=[AND(=($1, $9), =($2, $10))], joinType=[inner])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+EnumerableMergeJoin(condition=[AND(=($1, $9), =($2, $10))], joinType=[inner])
+  EnumerableSort(sort0=[$2], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])
+    EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+  EnumerableSort(sort0=[$1], sort1=[$0], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])
+    EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSortMergeJoinRight">
+    <Resource name="sql">
+      <![CDATA[select * from
+        sales.emp r join sales.bonus s on r.ename=s.ename and r.job=s.job
+        order by s.job desc nulls last, s.ename nulls first]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$10], sort1=[$9], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])
+  LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], ENAME0=[$9], JOB0=[$10], SAL0=[$11], COMM0=[$12])
+    LogicalJoin(condition=[AND(=($1, $9), =($2, $10))], joinType=[inner])
+      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+EnumerableMergeJoin(condition=[AND(=($1, $9), =($2, $10))], joinType=[inner])
+  EnumerableSort(sort0=[$2], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])
+    EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+  EnumerableSort(sort0=[$1], sort1=[$0], dir0=[DESC-nulls-last], dir1=[ASC-nulls-first])
+    EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMergeJoinDeriveLeft1">
+    <Resource name="sql">
+      <![CDATA[select * from
+        (select ename, job, max(sal) from sales.emp group by ename, job) r
+        join sales.bonus s on r.job=s.job and r.ename=s.ename]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(ENAME=[$0], JOB=[$1], EXPR$2=[$2], ENAME0=[$3], JOB0=[$4], SAL=[$5], COMM=[$6])
+  LogicalJoin(condition=[AND(=($1, $4), =($0, $3))], joinType=[inner])
+    LogicalAggregate(group=[{0, 1}], EXPR$2=[MAX($2)])
+      LogicalProject(ENAME=[$1], JOB=[$2], SAL=[$5])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+EnumerableMergeJoin(condition=[AND(=($1, $4), =($0, $3))], joinType=[inner])
+  EnumerableSortedAggregate(group=[{1, 2}], EXPR$2=[MAX($5)])
+    EnumerableSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])
+      EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+  EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+    EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMergeJoinDeriveLeft2">
+    <Resource name="sql">
+      <![CDATA[select * from
+        (select ename, job, mgr, max(sal) from sales.emp group by ename, job, mgr) r
+        join sales.bonus s on r.job=s.job and r.ename=s.ename]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(ENAME=[$0], JOB=[$1], MGR=[$2], EXPR$3=[$3], ENAME0=[$4], JOB0=[$5], SAL=[$6], COMM=[$7])
+  LogicalJoin(condition=[AND(=($1, $5), =($0, $4))], joinType=[inner])
+    LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[MAX($3)])
+      LogicalProject(ENAME=[$1], JOB=[$2], MGR=[$3], SAL=[$5])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+EnumerableMergeJoin(condition=[AND(=($1, $5), =($0, $4))], joinType=[inner])
+  EnumerableSortedAggregate(group=[{1, 2, 3}], EXPR$3=[MAX($5)])
+    EnumerableSort(sort0=[$2], sort1=[$1], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+      EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+  EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+    EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMergeJoinDeriveRight1">
+    <Resource name="sql">
+      <![CDATA[select * from sales.bonus s join
+        (select ename, job, max(sal) from sales.emp group by ename, job) r
+        on r.job=s.job and r.ename=s.ename]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], ENAME0=[$4], JOB0=[$5], EXPR$2=[$6])
+  LogicalJoin(condition=[AND(=($5, $1), =($4, $0))], joinType=[inner])
+    LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+    LogicalAggregate(group=[{0, 1}], EXPR$2=[MAX($2)])
+      LogicalProject(ENAME=[$1], JOB=[$2], SAL=[$5])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+EnumerableMergeJoin(condition=[AND(=($1, $5), =($0, $4))], joinType=[inner])
+  EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+    EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
+  EnumerableSortedAggregate(group=[{1, 2}], EXPR$2=[MAX($5)])
+    EnumerableSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])
+      EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMergeJoinDeriveRight2">
+    <Resource name="sql">
+      <![CDATA[select * from sales.bonus s join
+        (select ename, job, mgr, max(sal) from sales.emp group by ename, job, mgr) r
+        on r.job=s.job and r.ename=s.ename]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(ENAME=[$0], JOB=[$1], SAL=[$2], COMM=[$3], ENAME0=[$4], JOB0=[$5], MGR=[$6], EXPR$3=[$7])
+  LogicalJoin(condition=[AND(=($5, $1), =($4, $0))], joinType=[inner])
+    LogicalTableScan(table=[[CATALOG, SALES, BONUS]])
+    LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[MAX($3)])
+      LogicalProject(ENAME=[$1], JOB=[$2], MGR=[$3], SAL=[$5])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+EnumerableMergeJoin(condition=[AND(=($1, $5), =($0, $4))], joinType=[inner])
+  EnumerableSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[ASC])
+    EnumerableTableScan(table=[[CATALOG, SALES, BONUS]])
+  EnumerableSortedAggregate(group=[{1, 2, 3}], EXPR$3=[MAX($5)])
+    EnumerableSort(sort0=[$2], sort1=[$1], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC])
+      EnumerableTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/core/src/test/resources/saffron.properties b/core/src/test/resources/saffron.properties
new file mode 100644
index 0000000..1e2802f
--- /dev/null
+++ b/core/src/test/resources/saffron.properties
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+calcite.planner.topdown.opt=true
diff --git a/plus/src/test/java/org/apache/calcite/adapter/tpcds/TpcdsTest.java b/plus/src/test/java/org/apache/calcite/adapter/tpcds/TpcdsTest.java
index dec8206..32bc89a 100644
--- a/plus/src/test/java/org/apache/calcite/adapter/tpcds/TpcdsTest.java
+++ b/plus/src/test/java/org/apache/calcite/adapter/tpcds/TpcdsTest.java
@@ -220,29 +220,33 @@ class TpcdsTest {
         .withHook(Hook.PROGRAM, handler(true, 2))
         .explainMatches("including all attributes ",
             CalciteAssert.checkMaskedResultContains(""
-                + "EnumerableCalc(expr#0..9=[{inputs}], expr#10=[/($t4, $t3)], expr#11=[CAST($t10):INTEGER NOT NULL], expr#12=[*($t4, $t4)], expr#13=[/($t12, $t3)], expr#14=[-($t5, $t13)], expr#15=[1], expr#16=[=($t3, $t15)], expr#17=[null:BIGINT], expr#18=[-($t3, $t15)], expr#19=[CASE($t16, $t17, $t18)], expr#20=[/($t14, $t19)], expr#21=[0.5:DECIMAL(2, 1)], expr#22=[POWER($t20, $t21)], expr#23=[CAST($t22):INTEGER NOT NULL], expr#24=[/($t23, $t11)], expr#25=[/($t6, $t3)], expr#26=[CAST($ [...]
-                + "  EnumerableLimit(fetch=[100]): rowcount = 100.0, cumulative cost = {1.2435775409784036E28 rows, 2.555295485909236E30 cpu, 0.0 io}\n"
-                + "    EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC]): rowcount = 5.434029018852197E26, cumulative cost = {1.2435775409784036E28 rows, 2.555295485909236E30 cpu, 0.0 io}\n"
-                + "      EnumerableAggregate(group=[{0, 1, 2}], STORE_SALES_QUANTITYCOUNT=[COUNT()], agg#1=[$SUM0($3)], agg#2=[$SUM0($6)], agg#3=[$SUM0($4)], agg#4=[$SUM0($7)], agg#5=[$SUM0($5)], agg#6=[$SUM0($8)]): rowcount = 5.434029018852197E26, cumulative cost = {1.1892372507898816E28 rows, 1.2172225002228922E30 cpu, 0.0 io}\n"
-                + "        EnumerableCalc(expr#0..211=[{inputs}], expr#212=[*($t89, $t89)], expr#213=[*($t140, $t140)], expr#214=[*($t196, $t196)], I_ITEM_ID=[$t58], I_ITEM_DESC=[$t61], S_STATE=[$t24], SS_QUANTITY=[$t89], SR_RETURN_QUANTITY=[$t140], CS_QUANTITY=[$t196], $f6=[$t212], $f7=[$t213], $f8=[$t214]): rowcount = 5.434029018852197E27, cumulative cost = {1.0873492066864028E28 rows, 1.2172225002228922E30 cpu, 0.0 io}\n"
-                + "          EnumerableHashJoin(condition=[AND(=($82, $133), =($81, $132), =($88, $139))], joinType=[inner]): rowcount = 5.434029018852197E27, cumulative cost = {5.439463048011832E27 rows, 1.8506796E7 cpu, 0.0 io}\n"
-                + "            EnumerableHashJoin(condition=[=($0, $86)], joinType=[inner]): rowcount = 2.3008402586892598E13, cumulative cost = {4.8588854672854766E13 rows, 7281360.0 cpu, 0.0 io}\n"
-                + "              EnumerableTableScan(table=[[TPCDS, STORE]]): rowcount = 12.0, cumulative cost = {12.0 rows, 13.0 cpu, 0.0 io}\n"
-                + "              EnumerableHashJoin(condition=[=($0, $50)], joinType=[inner]): rowcount = 1.2782445881607E13, cumulative cost = {1.279800620431234E13 rows, 7281347.0 cpu, 0.0 io}\n"
-                + "                EnumerableCalc(expr#0..27=[{inputs}], expr#28=['1998Q1'], expr#29=[=($t15, $t28)], proj#0..27=[{exprs}], $condition=[$t29]): rowcount = 10957.35, cumulative cost = {84006.35 rows, 4382941.0 cpu, 0.0 io}\n"
-                + "                  EnumerableTableScan(table=[[TPCDS, DATE_DIM]]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}\n"
-                + "                EnumerableHashJoin(condition=[=($0, $24)], joinType=[inner]): rowcount = 7.7770908E9, cumulative cost = {7.783045975286664E9 rows, 2898406.0 cpu, 0.0 io}\n"
-                + "                  EnumerableTableScan(table=[[TPCDS, ITEM]]): rowcount = 18000.0, cumulative cost = {18000.0 rows, 18001.0 cpu, 0.0 io}\n"
-                + "                  EnumerableTableScan(table=[[TPCDS, STORE_SALES]]): rowcount = 2880404.0, cumulative cost = {2880404.0 rows, 2880405.0 cpu, 0.0 io}\n"
-                + "            EnumerableHashJoin(condition=[AND(=($31, $79), =($30, $91))], joinType=[inner]): rowcount = 6.9978029381741304E16, cumulative cost = {7.0048032234040472E16 rows, 1.1225436E7 cpu, 0.0 io}\n"
-                + "              EnumerableHashJoin(condition=[=($0, $28)], joinType=[inner]): rowcount = 7.87597881975E8, cumulative cost = {7.884434212216867E8 rows, 5035701.0 cpu, 0.0 io}\n"
-                + "                EnumerableCalc(expr#0..27=[{inputs}], expr#28=['1998Q1'], expr#29=[=($t15, $t28)], expr#30=['1998Q2'], expr#31=[=($t15, $t30)], expr#32=['1998Q3'], expr#33=[=($t15, $t32)], expr#34=[OR($t29, $t31, $t33)], proj#0..27=[{exprs}], $condition=[$t34]): rowcount = 18262.25, cumulative cost = {91311.25 rows, 4748186.0 cpu, 0.0 io}\n"
-                + "                  EnumerableTableScan(table=[[TPCDS, DATE_DIM]]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}\n"
-                + "                EnumerableTableScan(table=[[TPCDS, STORE_RETURNS]]): rowcount = 287514.0, cumulative cost = {287514.0 rows, 287515.0 cpu, 0.0 io}\n"
-                + "              EnumerableHashJoin(condition=[=($0, $28)], joinType=[inner]): rowcount = 3.94888649445E9, cumulative cost = {3.9520401026966867E9 rows, 6189735.0 cpu, 0.0 io}\n"
-                + "                EnumerableCalc(expr#0..27=[{inputs}], expr#28=['1998Q1'], expr#29=[=($t15, $t28)], expr#30=['1998Q2'], expr#31=[=($t15, $t30)], expr#32=['1998Q3'], expr#33=[=($t15, $t32)], expr#34=[OR($t29, $t31, $t33)], proj#0..27=[{exprs}], $condition=[$t34]): rowcount = 18262.25, cumulative cost = {91311.25 rows, 4748186.0 cpu, 0.0 io}\n"
-                + "                  EnumerableTableScan(table=[[TPCDS, DATE_DIM]]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}\n"
-                + "                EnumerableTableScan(table=[[TPCDS, CATALOG_SALES]]): rowcount = 1441548.0, cumulative cost = {1441548.0 rows, 1441549.0 cpu, 0.0 io}\n"));
+                + "EnumerableCalc(expr#0..9=[{inputs}], expr#10=[/($t4, $t3)], expr#11=[CAST($t10):INTEGER NOT NULL], expr#12=[*($t4, $t4)], expr#13=[/($t12, $t3)], expr#14=[-($t5, $t13)], expr#15=[1], expr#16=[=($t3, $t15)], expr#17=[null:BIGINT], expr#18=[-($t3, $t15)], expr#19=[CASE($t16, $t17, $t18)], expr#20=[/($t14, $t19)], expr#21=[0.5:DECIMAL(2, 1)], expr#22=[POWER($t20, $t21)], expr#23=[CAST($t22):INTEGER NOT NULL], expr#24=[/($t23, $t11)], expr#25=[/($t6, $t3)], expr#26=[CAST($ [...]
+                + "  EnumerableLimit(fetch=[100]): rowcount = 100.0, cumulative cost = {1.2430341380834431E28 rows, 2.555295487103789E30 cpu, 0.0 io}\n"
+                + "    EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC]): rowcount = 5.434029018852197E26, cumulative cost = {1.2430341380834431E28 rows, 2.555295487103789E30 cpu, 0.0 io}\n"
+                + "      EnumerableAggregate(group=[{0, 1, 2}], STORE_SALES_QUANTITYCOUNT=[COUNT()], agg#1=[$SUM0($3)], agg#2=[$SUM0($6)], agg#3=[$SUM0($4)], agg#4=[$SUM0($7)], agg#5=[$SUM0($5)], agg#6=[$SUM0($8)]): rowcount = 5.434029018852197E26, cumulative cost = {1.1886938478949211E28 rows, 1.2172225014174444E30 cpu, 0.0 io}\n"
+                + "        EnumerableCalc(expr#0..211=[{inputs}], expr#212=[*($t89, $t89)], expr#213=[*($t140, $t140)], expr#214=[*($t196, $t196)], I_ITEM_ID=[$t58], I_ITEM_DESC=[$t61], S_STATE=[$t24], SS_QUANTITY=[$t89], SR_RETURN_QUANTITY=[$t140], CS_QUANTITY=[$t196], $f6=[$t212], $f7=[$t213], $f8=[$t214]): rowcount = 5.434029018852197E27, cumulative cost = {1.0868058037914423E28 rows, 1.2172225014174444E30 cpu, 0.0 io}\n"
+                + "          EnumerableMergeJoin(condition=[AND(=($82, $133), =($81, $132), =($88, $139))], joinType=[inner]): rowcount = 5.434029018852197E27, cumulative cost = {5.434029019062226E27 rows, 1.1945521881363807E21 cpu, 0.0 io}\n"
+                + "            EnumerableSort(sort0=[$82], sort1=[$81], sort2=[$88], dir0=[ASC], dir1=[ASC], dir2=[ASC]): rowcount = 2.3008402586892598E13, cumulative cost = {7.159725725974736E13 rows, 2.8882188423696301E17 cpu, 0.0 io}\n"
+                + "              EnumerableHashJoin(condition=[=($0, $86)], joinType=[inner]): rowcount = 2.3008402586892598E13, cumulative cost = {4.8588854672854766E13 rows, 7281360.0 cpu, 0.0 io}\n"
+                + "                EnumerableTableScan(table=[[TPCDS, STORE]]): rowcount = 12.0, cumulative cost = {12.0 rows, 13.0 cpu, 0.0 io}\n"
+                + "                EnumerableHashJoin(condition=[=($0, $50)], joinType=[inner]): rowcount = 1.2782445881607E13, cumulative cost = {1.279800620431234E13 rows, 7281347.0 cpu, 0.0 io}\n"
+                + "                  EnumerableCalc(expr#0..27=[{inputs}], expr#28=['1998Q1'], expr#29=[=($t15, $t28)], proj#0..27=[{exprs}], $condition=[$t29]): rowcount = 10957.35, cumulative cost = {84006.35 rows, 4382941.0 cpu, 0.0 io}\n"
+                + "                    EnumerableTableScan(table=[[TPCDS, DATE_DIM]]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}\n"
+                + "                  EnumerableHashJoin(condition=[=($0, $24)], joinType=[inner]): rowcount = 7.7770908E9, cumulative cost = {7.783045975286664E9 rows, 2898406.0 cpu, 0.0 io}\n"
+                + "                    EnumerableTableScan(table=[[TPCDS, ITEM]]): rowcount = 18000.0, cumulative cost = {18000.0 rows, 18001.0 cpu, 0.0 io}\n"
+                + "                    EnumerableTableScan(table=[[TPCDS, STORE_SALES]]): rowcount = 2880404.0, cumulative cost = {2880404.0 rows, 2880405.0 cpu, 0.0 io}\n"
+                + "            EnumerableSort(sort0=[$31], sort1=[$30], sort2=[$37], dir0=[ASC], dir1=[ASC], dir2=[ASC]): rowcount = 6.9978029381741304E16, cumulative cost = {1.3995607297693488E17 rows, 1.1942633662521438E21 cpu, 0.0 io}\n"
+                + "              EnumerableMergeJoin(condition=[AND(=($31, $79), =($30, $91))], joinType=[inner]): rowcount = 6.9978029381741304E16, cumulative cost = {6.9978043595193584E16 rows, 2.473747714463278E13 cpu, 0.0 io}\n"
+                + "                EnumerableSort(sort0=[$31], sort1=[$30], dir0=[ASC], dir1=[ASC]): rowcount = 7.87597881975E8, cumulative cost = {1.5760413031966867E9 rows, 3.097646138009654E12 cpu, 0.0 io}\n"
+                + "                  EnumerableHashJoin(condition=[=($0, $28)], joinType=[inner]): rowcount = 7.87597881975E8, cumulative cost = {7.884434212216867E8 rows, 5035701.0 cpu, 0.0 io}\n"
+                + "                    EnumerableCalc(expr#0..27=[{inputs}], expr#28=['1998Q1'], expr#29=[=($t15, $t28)], expr#30=['1998Q2'], expr#31=[=($t15, $t30)], expr#32=['1998Q3'], expr#33=[=($t15, $t32)], expr#34=[OR($t29, $t31, $t33)], proj#0..27=[{exprs}], $condition=[$t34]): rowcount = 18262.25, cumulative cost = {91311.25 rows, 4748186.0 cpu, 0.0 io}\n"
+                + "                      EnumerableTableScan(table=[[TPCDS, DATE_DIM]]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}\n"
+                + "                    EnumerableTableScan(table=[[TPCDS, STORE_RETURNS]]): rowcount = 287514.0, cumulative cost = {287514.0 rows, 287515.0 cpu, 0.0 io}\n"
+                + "                EnumerableSort(sort0=[$31], sort1=[$43], dir0=[ASC], dir1=[ASC]): rowcount = 3.94888649445E9, cumulative cost = {7.900926597146687E9 rows, 2.163983100662313E13 cpu, 0.0 io}\n"
+                + "                  EnumerableHashJoin(condition=[=($0, $28)], joinType=[inner]): rowcount = 3.94888649445E9, cumulative cost = {3.9520401026966867E9 rows, 6189735.0 cpu, 0.0 io}\n"
+                + "                    EnumerableCalc(expr#0..27=[{inputs}], expr#28=['1998Q1'], expr#29=[=($t15, $t28)], expr#30=['1998Q2'], expr#31=[=($t15, $t30)], expr#32=['1998Q3'], expr#33=[=($t15, $t32)], expr#34=[OR($t29, $t31, $t33)], proj#0..27=[{exprs}], $condition=[$t34]): rowcount = 18262.25, cumulative cost = {91311.25 rows, 4748186.0 cpu, 0.0 io}\n"
+                + "                      EnumerableTableScan(table=[[TPCDS, DATE_DIM]]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}\n"
+                + "                    EnumerableTableScan(table=[[TPCDS, CATALOG_SALES]]): rowcount = 1441548.0, cumulative cost = {1441548.0 rows, 1441549.0 cpu, 0.0 io}\n"));
   }
 
   @Disabled("throws 'RuntimeException: Cannot convert null to long'")