You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/07/11 07:02:05 UTC

[doris] branch master updated: [feature](Nereids): cost and enforcer job in cascades. (#10657)

This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 51855633e4 [feature](Nereids): cost and enforcer job in cascades. (#10657)
51855633e4 is described below

commit 51855633e47b6eb6c7b2618bf16d4d073d073168
Author: jakevin <ja...@gmail.com>
AuthorDate: Mon Jul 11 15:01:59 2022 +0800

    [feature](Nereids): cost and enforcer job in cascades. (#10657)
    
    Issue Number: close #9640
    
    Add enforcer job for cascades.
    
    Inspired by to *NoisePage enforcer job*, and *ORCA paper*
    
    During this period, we will derive physical property for plan tree, and prune the plan according to the cos.
---
 .../apache/doris/nereids/cost/CostCalculator.java  |   4 +-
 .../org/apache/doris/nereids/jobs/JobContext.java  |   6 +-
 .../nereids/jobs/cascades/CostAndEnforcerJob.java  | 189 +++++++++++++++++++++
 .../java/org/apache/doris/nereids/memo/Group.java  |  31 ++++
 .../apache/doris/nereids/memo/GroupExpression.java |  34 ++++
 .../java/org/apache/doris/nereids/memo/Memo.java   |   7 +
 .../doris/nereids/operators/OperatorType.java      |   7 +-
 .../plans/physical/PhysicalDistribution.java}      |  29 ++--
 .../properties/ChildrenOutputPropertyDeriver.java  |  74 ++++++++
 .../doris/nereids/properties/DistributionSpec.java |  29 +++-
 .../properties/EnforceMissingPropertiesHelper.java | 125 ++++++++++++++
 ...Properties.java => GatherDistributionSpec.java} |  15 +-
 .../apache/doris/nereids/properties/OrderKey.java  |   9 +
 .../apache/doris/nereids/properties/OrderSpec.java |  65 +++++++
 .../properties/ParentRequiredPropertyDeriver.java  |  58 +++++++
 .../nereids/properties/PhysicalProperties.java     |  35 +++-
 16 files changed, 678 insertions(+), 39 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
index 0960dcb703..6e7bfd895e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
@@ -40,14 +40,14 @@ public class CostCalculator {
     /**
      * Constructor.
      */
-    public double calculateCost(GroupExpression groupExpression) {
+    public static double calculateCost(GroupExpression groupExpression) {
         PlanContext planContext = new PlanContext(groupExpression);
         CostEstimator costCalculator = new CostEstimator();
         CostEstimate costEstimate = groupExpression.getOperator().accept(costCalculator, planContext);
         return costFormula(costEstimate);
     }
 
-    private double costFormula(CostEstimate costEstimate) {
+    private static double costFormula(CostEstimate costEstimate) {
         double cpuCostWeight = 1;
         double memoryCostWeight = 1;
         double networkCostWeight = 1;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java
index 0aee081b9b..62143398e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java
@@ -26,7 +26,7 @@ import org.apache.doris.nereids.properties.PhysicalProperties;
 public class JobContext {
     private final PlannerContext plannerContext;
     private final PhysicalProperties requiredProperties;
-    private final double costUpperBound;
+    private double costUpperBound;
 
     public JobContext(PlannerContext plannerContext, PhysicalProperties requiredProperties, double costUpperBound) {
         this.plannerContext = plannerContext;
@@ -45,4 +45,8 @@ public class JobContext {
     public double getCostUpperBound() {
         return costUpperBound;
     }
+
+    public void setCostUpperBound(double costUpperBound) {
+        this.costUpperBound = costUpperBound;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index ee72328afc..5516525233 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -17,18 +17,49 @@
 
 package org.apache.doris.nereids.jobs.cascades;
 
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.cost.CostCalculator;
 import org.apache.doris.nereids.jobs.Job;
 import org.apache.doris.nereids.jobs.JobContext;
 import org.apache.doris.nereids.jobs.JobType;
 import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.ChildrenOutputPropertyDeriver;
+import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper;
+import org.apache.doris.nereids.properties.ParentRequiredPropertyDeriver;
+import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.plans.Plan;
 
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Optional;
+
 /**
  * Job to compute cost and add enforcer.
  */
 public class CostAndEnforcerJob extends Job<Plan> {
+    // GroupExpression to optimize
     private final GroupExpression groupExpression;
+    // Current total cost
+    private double curTotalCost;
+
+    // Properties from parent plan node.
+    // Like: Physical Hash Join
+    // [ [Properties ["", ANY], Properties ["", BROADCAST]],
+    //   [Properties ["", SHUFFLE_JOIN], Properties ["", SHUFFLE_JOIN]] ]
+    private List<List<PhysicalProperties>> propertiesListList;
+
+    private List<GroupExpression> childrenBestGroupExprList;
+    private List<PhysicalProperties> childrenOutputProperties = Lists.newArrayList();
+
+    // Current stage of enumeration through child groups
+    private int curChildIndex = -1;
+    // Indicator of last child group that we waited for optimization
+    private int prevChildIndex = -1;
+    // Current stage of enumeration through outputInputProperties
+    private int curPropertyPairIndex = 0;
 
     public CostAndEnforcerJob(GroupExpression groupExpression, JobContext context) {
         super(JobType.OPTIMIZE_CHILDREN, context);
@@ -47,4 +78,162 @@ public class CostAndEnforcerJob extends Job<Plan> {
             }
         }
     }
+
+    /**
+     * execute.
+     */
+    public void execute1() {
+        // Do init logic of root operator/groupExpr of `subplan`, only run once per task.
+        if (curChildIndex != -1) {
+            curTotalCost = 0;
+
+            // Get property from groupExpression operator (it's root of subplan).
+            ParentRequiredPropertyDeriver parentRequiredPropertyDeriver = new ParentRequiredPropertyDeriver(context);
+            propertiesListList = parentRequiredPropertyDeriver.getRequiredPropertyListList(groupExpression);
+
+            curChildIndex = 0;
+        }
+
+        for (; curPropertyPairIndex < propertiesListList.size(); curPropertyPairIndex++) {
+            // children input properties
+            List<PhysicalProperties> childrenInputProperties = propertiesListList.get(curPropertyPairIndex);
+
+            // Calculate cost of groupExpression and update total cost
+            if (curChildIndex == 0 && prevChildIndex == -1) {
+                curTotalCost += CostCalculator.calculateCost(groupExpression);
+            }
+
+            for (; curChildIndex < groupExpression.arity(); curChildIndex++) {
+                PhysicalProperties childInputProperties = childrenInputProperties.get(curChildIndex);
+                Group childGroup = groupExpression.child(curChildIndex);
+
+                // Whether the child group was optimized for this childInputProperties according to
+                // the result of returning.
+                Optional<Pair<Double, GroupExpression>> lowestCostPlanOpt = childGroup.getLowestCostPlan(
+                        childInputProperties);
+
+                if (!lowestCostPlanOpt.isPresent()) {
+                    // The child should be pruned due to cost prune.
+                    if (prevChildIndex >= curChildIndex) {
+                        break;
+                    }
+
+                    // This child isn't optimized, create new tasks to optimize it.
+                    // Meaning that optimize recursively by derive tasks.
+                    prevChildIndex = curChildIndex;
+                    pushTask((CostAndEnforcerJob) clone());
+                    double newCostUpperBound = context.getCostUpperBound() - curTotalCost;
+                    JobContext jobContext = new JobContext(context.getPlannerContext(), childInputProperties,
+                            newCostUpperBound);
+                    pushTask(new OptimizeGroupJob(childGroup, jobContext));
+                    return;
+                }
+
+                GroupExpression lowestCostExpr = lowestCostPlanOpt.get().second;
+
+                PhysicalProperties childOutputProperty = lowestCostExpr.getPropertyFromMap(childInputProperties);
+                // TODO: maybe need to record children lowestCostExpr
+                childrenInputProperties.set(curChildIndex, childOutputProperty);
+
+                // todo: check whether split agg broadcast row count limit.
+
+                curTotalCost += lowestCostExpr.getLowestCostTable().get(childInputProperties).first;
+                if (curTotalCost > context.getCostUpperBound()) {
+                    break;
+                }
+            }
+
+            // When we successfully optimize all child group, it's last child.
+            if (curChildIndex == groupExpression.arity()) {
+                // Not need to do pruning here because it has been done when we get the
+                // best expr from the child group
+
+                // TODO: it could update the cost.
+                PhysicalProperties outputProperty = ChildrenOutputPropertyDeriver.getProperties(
+                        context.getRequiredProperties(),
+                        childrenOutputProperties, groupExpression);
+
+                if (curTotalCost > context.getCostUpperBound()) {
+                    break;
+                }
+
+                /* update current group statistics and re-compute costs. */
+                if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() != null)) {
+                    return;
+                }
+                PlanContext planContext = new PlanContext(groupExpression);
+                // TODO: calculate stats.
+                groupExpression.getParent().setStatistics(planContext.getStatistics());
+
+                enforce(outputProperty, childrenInputProperties);
+            }
+
+            // Reset child idx and total cost
+            prevChildIndex = -1;
+            curChildIndex = 0;
+            curTotalCost = 0;
+        }
+    }
+
+    private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> inputProperties) {
+
+        // groupExpression can satisfy its own output property
+        putProperty(groupExpression, outputProperty, outputProperty, inputProperties);
+        // groupExpression can satisfy the ANY type output property
+        putProperty(groupExpression, outputProperty, new PhysicalProperties(), inputProperties);
+
+        EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context,
+                groupExpression, curTotalCost);
+
+        PhysicalProperties requiredProperties = context.getRequiredProperties();
+        if (outputProperty.meet(requiredProperties)) {
+            Pair<PhysicalProperties, Double> pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty,
+                    requiredProperties);
+            PhysicalProperties addEnforcedProperty = pair.first;
+            curTotalCost = pair.second;
+
+            // enforcedProperty is superset of requiredProperty
+            if (!addEnforcedProperty.equals(requiredProperties)) {
+                putProperty(groupExpression.getParent().getBestExpression(addEnforcedProperty),
+                        requiredProperties, requiredProperties, Lists.newArrayList(outputProperty));
+            }
+        } else {
+            if (!outputProperty.equals(requiredProperties)) {
+                putProperty(groupExpression, outputProperty, requiredProperties, inputProperties);
+            }
+        }
+
+        if (curTotalCost < context.getCostUpperBound()) {
+            context.setCostUpperBound(curTotalCost);
+        }
+    }
+
+    private void putProperty(GroupExpression groupExpression,
+            PhysicalProperties outputProperty,
+            PhysicalProperties requiredProperty,
+            List<PhysicalProperties> inputProperties) {
+        if (groupExpression.updateLowestCostTable(requiredProperty, inputProperties, curTotalCost)) {
+            // Each group expression need to record the outputProperty satisfy what requiredProperty,
+            // because group expression can generate multi outputProperty. eg. Join may have shuffle local
+            // and shuffle join two types outputProperty.
+            groupExpression.putOutputPropertiesMap(outputProperty, requiredProperty);
+        }
+        this.groupExpression.getParent().setBestPlan(groupExpression,
+                curTotalCost, requiredProperty);
+    }
+
+
+    /**
+     * Shallow clone (ignore clone propertiesListList and groupExpression).
+     */
+    @Override
+    public Object clone() {
+        CostAndEnforcerJob task;
+        try {
+            task = (CostAndEnforcerJob) super.clone();
+        } catch (CloneNotSupportedException ignored) {
+            return null;
+        }
+        return task;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
index d2e3e2e86c..a9343cdd97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.statistics.StatsDeriveResult;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -50,6 +51,7 @@ public class Group {
     private double costLowerBound = -1;
     private boolean isExplored = false;
     private boolean hasCost = false;
+    private StatsDeriveResult statistics;
 
     /**
      * Constructor for Group.
@@ -135,6 +137,35 @@ public class Group {
         this.costLowerBound = costLowerBound;
     }
 
+    /**
+     * Set or update lowestCostPlans: properties --> new Pair<>(cost, expression)
+     */
+    public void setBestPlan(GroupExpression expression, double cost, PhysicalProperties properties) {
+        if (lowestCostPlans.containsKey(properties)) {
+            if (lowestCostPlans.get(properties).first > cost) {
+                lowestCostPlans.put(properties, new Pair<>(cost, expression));
+            }
+        } else {
+            lowestCostPlans.put(properties, new Pair<>(cost, expression));
+        }
+    }
+
+    public GroupExpression getBestExpression(PhysicalProperties properties) {
+        if (lowestCostPlans.containsKey(properties)) {
+            return lowestCostPlans.get(properties).second;
+        }
+        return null;
+    }
+
+    public StatsDeriveResult getStatistics() {
+        return statistics;
+    }
+
+    public void setStatistics(StatsDeriveResult statistics) {
+        this.statistics = statistics;
+    }
+
+
     public List<GroupExpression> getLogicalExpressions() {
         return logicalExpressions;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index bb7da075cd..da4373de66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -44,6 +44,8 @@ public class GroupExpression {
 
     // Mapping from output properties to the corresponding best cost, statistics, and child properties.
     private final Map<PhysicalProperties, Pair<Double, List<PhysicalProperties>>> lowestCostTable;
+    // Each physical group expression maintains mapping incoming requests to the corresponding child requests.
+    private final Map<PhysicalProperties, PhysicalProperties> requestPropertiesMap;
 
     public GroupExpression(Operator op) {
         this(op, Lists.newArrayList());
@@ -61,6 +63,14 @@ public class GroupExpression {
         this.ruleMasks = new BitSet(RuleType.SENTINEL.ordinal());
         this.statDerived = false;
         this.lowestCostTable = Maps.newHashMap();
+        this.requestPropertiesMap = Maps.newHashMap();
+    }
+
+    // TODO: rename
+    public PhysicalProperties getPropertyFromMap(PhysicalProperties requiredPropertySet) {
+        PhysicalProperties outputProperty = requestPropertiesMap.get(requiredPropertySet);
+        Preconditions.checkState(outputProperty != null);
+        return outputProperty;
     }
 
     public int arity() {
@@ -124,6 +134,30 @@ public class GroupExpression {
         return lowestCostTable.get(require).second;
     }
 
+    /**
+     * Add a (parentOutputProperties) -> (cost, childrenInputProperties) in lowestCostTable.
+     */
+    public boolean updateLowestCostTable(
+            PhysicalProperties parentOutputProperties,
+            List<PhysicalProperties> childrenInputProperties,
+            double cost) {
+        if (lowestCostTable.containsKey(parentOutputProperties)) {
+            if (lowestCostTable.get(parentOutputProperties).first > cost) {
+                lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+                return true;
+            }
+        } else {
+            lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+            return true;
+        }
+        return false;
+    }
+
+    public void putOutputPropertiesMap(PhysicalProperties outputPropertySet,
+            PhysicalProperties requiredPropertySet) {
+        this.requestPropertiesMap.put(requiredPropertySet, outputPropertySet);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
index 5924a75bdc..e07f4e46ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
@@ -193,4 +193,11 @@ public class Memo {
         }
         groups.remove(source);
     }
+
+    /**
+     * Add enforcer expression into the target group.
+     */
+    public void addEnforcerPlan(GroupExpression groupExpression, Group group) {
+        groupExpression.setParent(group);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
index ad112935cf..a7f72ca3b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
@@ -23,9 +23,9 @@ package org.apache.doris.nereids.operators;
  * 1. ANY: match any operator
  * 2. MULTI: match multiple operators
  * 3. FIXED: the leaf node of pattern tree, which can be matched by a single operator
- *         but this operator cannot be used in rules
+ * but this operator cannot be used in rules
  * 4. MULTI_FIXED: the leaf node of pattern tree, which can be matched by multiple operators,
- *        but these operators cannot be used in rules
+ * but these operators cannot be used in rules
  */
 public enum OperatorType {
     UNKNOWN,
@@ -48,5 +48,6 @@ public enum OperatorType {
     PHYSICAL_AGGREGATION,
     PHYSICAL_SORT,
     PHYSICAL_HASH_JOIN,
-    PHYSICAL_EXCHANGE;
+    PHYSICAL_EXCHANGE,
+    PHYSICAL_DISTRIBUTION;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java
similarity index 58%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java
index 4122de6901..64c23ee024 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java
@@ -15,29 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.properties;
+package org.apache.doris.nereids.operators.plans.physical;
 
-import org.apache.doris.planner.DataPartition;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.DistributionSpec;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import java.util.List;
 
 /**
- * Base class for data distribution.
+ * Enforcer operator.
  */
-public class DistributionSpec {
-
-    private DataPartition dataPartition;
+public class PhysicalDistribution extends PhysicalUnaryOperator {
 
-    public DistributionSpec() {
-    }
+    protected DistributionSpec distributionSpec;
 
-    public DistributionSpec(DataPartition dataPartition) {
-        this.dataPartition = dataPartition;
-    }
 
-    public DataPartition getDataPartition() {
-        return dataPartition;
+    public PhysicalDistribution(DistributionSpec spec) {
+        super(OperatorType.PHYSICAL_DISTRIBUTION);
     }
 
-    public void setDataPartition(DataPartition dataPartition) {
-        this.dataPartition = dataPartition;
+    @Override
+    public List<Expression> getExpressions() {
+        return null;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
new file mode 100644
index 0000000000..6df2cfe15c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.OperatorVisitor;
+
+import java.util.List;
+
+/**
+ * Used for property drive.
+ */
+public class ChildrenOutputPropertyDeriver extends OperatorVisitor<PhysicalProperties, PlanContext> {
+    PhysicalProperties requirements;
+    List<PhysicalProperties> childrenOutputProperties;
+
+    public ChildrenOutputPropertyDeriver(PhysicalProperties requirements,
+            List<PhysicalProperties> childrenOutputProperties) {
+        this.childrenOutputProperties = childrenOutputProperties;
+        this.requirements = requirements;
+    }
+
+    public static PhysicalProperties getProperties(
+            PhysicalProperties requirements,
+            List<PhysicalProperties> childrenOutputProperties,
+            GroupExpression groupExpression) {
+
+        ChildrenOutputPropertyDeriver childrenOutputPropertyDeriver = new ChildrenOutputPropertyDeriver(requirements,
+                childrenOutputProperties);
+
+        return groupExpression.getOperator().accept(childrenOutputPropertyDeriver, new PlanContext(groupExpression));
+    }
+
+    public PhysicalProperties getRequirements() {
+        return requirements;
+    }
+
+    //    public List<List<PhysicalProperties>> getProperties(GroupExpression groupExpression) {
+    //        properties = Lists.newArrayList();
+    //        groupExpression.getOperator().accept(this, new PlanContext(groupExpression));
+    //        return properties;
+    //    }
+
+    //    @Override
+    //    public Void visitOperator(Operator operator, PlanContext context) {
+    //        List<PhysicalProperties> props = Lists.newArrayList();
+    //        for (int childIndex = 0; childIndex < context.getGroupExpression().arity(); ++childIndex) {
+    //            props.add(new PhysicalProperties());
+    //        }
+    //        properties.add(props);
+    //        return null;
+    //    }
+    @Override
+    public PhysicalProperties visitOperator(Operator node, PlanContext context) {
+        return new PhysicalProperties();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
index 4122de6901..cd0ea5a438 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
@@ -17,15 +17,21 @@
 
 package org.apache.doris.nereids.properties;
 
+import org.apache.doris.nereids.memo.Group;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalDistribution;
 import org.apache.doris.planner.DataPartition;
 
+import com.google.common.collect.Lists;
+
 /**
- * Base class for data distribution.
+ * Spec of data distribution.
  */
 public class DistributionSpec {
 
     private DataPartition dataPartition;
 
+    // TODO: why exist?
     public DistributionSpec() {
     }
 
@@ -33,6 +39,16 @@ public class DistributionSpec {
         this.dataPartition = dataPartition;
     }
 
+    /**
+     * TODO: need read ORCA.
+     * Whether other `DistributionSpec` is satisfied the current `DistributionSpec`.
+     *
+     * @param other another DistributionSpec.
+     */
+    public boolean meet(DistributionSpec other) {
+        return false;
+    }
+
     public DataPartition getDataPartition() {
         return dataPartition;
     }
@@ -40,4 +56,15 @@ public class DistributionSpec {
     public void setDataPartition(DataPartition dataPartition) {
         this.dataPartition = dataPartition;
     }
+
+    public GroupExpression addEnforcer(Group child) {
+        return new GroupExpression(new PhysicalDistribution(new DistributionSpec(dataPartition)),
+                Lists.newArrayList(child));
+    }
+
+    // TODO
+    @Override
+    public boolean equals(Object obj) {
+        return super.equals(obj);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
new file mode 100644
index 0000000000..aaa7a0d113
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.cost.CostCalculator;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+
+import com.google.common.collect.Lists;
+
+/**
+ * When parent node request some properties but children don't have.
+ * Enforce add missing properties for child.
+ */
+public class EnforceMissingPropertiesHelper {
+
+    private JobContext context;
+    private GroupExpression groupExpression;
+    private double curTotalCost;
+
+    public EnforceMissingPropertiesHelper(JobContext context, GroupExpression groupExpression,
+            double curTotalCost) {
+        this.context = context;
+        this.groupExpression = groupExpression;
+        this.curTotalCost = curTotalCost;
+    }
+
+    /**
+     * Enforce missing property.
+     */
+    public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties output, PhysicalProperties required) {
+        boolean isMeetOrder = output.getOrderSpec().meet(required.getOrderSpec());
+        boolean isMeetDistribution = output.getDistributionSpec().meet(required.getDistributionSpec());
+
+        if (!isMeetDistribution && !isMeetOrder) {
+            return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost);
+        } else if (isMeetDistribution && isMeetOrder) {
+            return new Pair<>(null, curTotalCost);
+        } else if (!isMeetDistribution) {
+            if (required.getOrderSpec().getOrderKeys().isEmpty()) {
+                return new Pair<>(enforceDistribution(output), curTotalCost);
+            } else {
+                // TODO
+                // It's wrong that SortSpec is empty.
+                // After redistribute data , original order requirement may be wrong. Need enforce "SortNode" here.
+                // PhysicalProperties newProperty =
+                //         new PhysicalProperties(new DistributionSpec(), new OrderSpec(Lists.newArrayList()));
+                // groupExpression.getParent().
+                // return enforceSortAndDistribution(newProperty, required);
+                return new Pair<>(enforceDistribution(output), curTotalCost);
+            }
+        } else {
+            return new Pair<>(enforceSort(output), curTotalCost);
+        }
+    }
+
+    private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) {
+        // clone
+        PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
+                oldOutputProperty.getOrderSpec());
+        newOutputProperty.setOrderSpec(context.getRequiredProperties().getOrderSpec());
+        GroupExpression enforcer =
+                context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getParent());
+
+        updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
+
+        return newOutputProperty;
+    }
+
+    private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputProperty) {
+        PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
+                oldOutputProperty.getOrderSpec());
+        newOutputProperty.setDistributionSpec(context.getRequiredProperties().getDistributionSpec());
+        GroupExpression enforcer =
+                context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getParent());
+
+        updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
+
+        return newOutputProperty;
+    }
+
+    private void updateCostWithEnforcer(GroupExpression enforcer,
+            PhysicalProperties oldOutputProperty,
+            PhysicalProperties newOutputProperty) {
+        context.getPlannerContext().getMemo().addEnforcerPlan(enforcer, groupExpression.getParent());
+        curTotalCost += CostCalculator.calculateCost(enforcer);
+
+        if (enforcer.updateLowestCostTable(newOutputProperty, Lists.newArrayList(oldOutputProperty), curTotalCost)) {
+            enforcer.putOutputPropertiesMap(newOutputProperty, newOutputProperty);
+        }
+        groupExpression.getParent().setBestPlan(enforcer, curTotalCost, newOutputProperty);
+    }
+
+    private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty,
+            PhysicalProperties requiredProperty) {
+        PhysicalProperties enforcedProperty;
+        if (requiredProperty.getDistributionSpec()
+                .equals(new GatherDistributionSpec())) {
+            enforcedProperty = enforceSort(outputProperty);
+            enforcedProperty = enforceDistribution(enforcedProperty);
+        } else {
+            enforcedProperty = enforceDistribution(outputProperty);
+            enforcedProperty = enforceSort(enforcedProperty);
+        }
+
+        return enforcedProperty;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
similarity index 69%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
index 50899b7f31..e197ade368 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
@@ -18,18 +18,11 @@
 package org.apache.doris.nereids.properties;
 
 /**
- * Physical properties used in cascades.
+ * Re-shuffle.
  */
-public class PhysicalProperties {
-    private DistributionSpec distributionDesc;
+public class GatherDistributionSpec extends DistributionSpec {
 
-    public PhysicalProperties() {}
-
-    public DistributionSpec getDistributionDesc() {
-        return distributionDesc;
-    }
-
-    public void setDistributionDesc(DistributionSpec distributionDesc) {
-        this.distributionDesc = distributionDesc;
+    public GatherDistributionSpec() {
+        super();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
index b9923db80a..7d6ee6eb8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
@@ -42,6 +42,15 @@ public class OrderKey {
         this.nullFirst = nullFirst;
     }
 
+    /**
+     * Whether other `OrderKey` is satisfied the current `OrderKey`.
+     *
+     * @param other another OrderKey.
+     */
+    public boolean matches(OrderKey other) {
+        return expr.equals(other.expr) && isAsc == other.isAsc && nullFirst == other.nullFirst;
+    }
+
     public Expression getExpr() {
         return expr;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
new file mode 100644
index 0000000000..1d1c3777ae
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.memo.Group;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHeapSort;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Spec of sort order.
+ */
+public class OrderSpec {
+    private final List<OrderKey> orderKeys;
+
+    public OrderSpec(List<OrderKey> orderKeys) {
+        this.orderKeys = orderKeys;
+    }
+
+    /**
+     * Whether other `OrderSpec` is satisfied the current `OrderSpec`.
+     *
+     * @param other another OrderSpec.
+     */
+    public boolean meet(OrderSpec other) {
+        if (this.orderKeys.size() < other.getOrderKeys().size()) {
+            return false;
+        }
+        for (int i = 0; i < other.getOrderKeys().size(); ++i) {
+            if (!this.orderKeys.get(i).matches(other.getOrderKeys().get(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public GroupExpression addEnforcer(Group child) {
+        return new GroupExpression(
+                new PhysicalHeapSort(orderKeys, -1, 0),
+                Lists.newArrayList(child)
+        );
+    }
+
+    public List<OrderKey> getOrderKeys() {
+        return orderKeys;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
new file mode 100644
index 0000000000..681500ed69
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.OperatorVisitor;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Used for parent property drive.
+ */
+public class ParentRequiredPropertyDeriver extends OperatorVisitor<Void, PlanContext> {
+
+    PhysicalProperties requestPropertyFromParent;
+    List<List<PhysicalProperties>> requiredPropertyListList;
+
+    public ParentRequiredPropertyDeriver(JobContext context) {
+        this.requestPropertyFromParent = context.getRequiredProperties();
+    }
+
+    public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) {
+        requiredPropertyListList = Lists.newArrayList();
+        groupExpression.getOperator().accept(this, new PlanContext(groupExpression));
+        return requiredPropertyListList;
+    }
+
+    @Override
+    public Void visitOperator(Operator operator, PlanContext context) {
+        List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
+        for (int i = 0; i < context.getGroupExpression().arity(); i++) {
+            requiredPropertyList.add(new PhysicalProperties());
+        }
+        requiredPropertyListList.add(requiredPropertyList);
+        return null;
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 50899b7f31..75b02d7945 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -19,17 +19,40 @@ package org.apache.doris.nereids.properties;
 
 /**
  * Physical properties used in cascades.
+ * TODO(wj): Do we need to `PhysicalPropertySpec` Interface like NoisePage?
  */
 public class PhysicalProperties {
-    private DistributionSpec distributionDesc;
+    private OrderSpec orderSpec;
 
-    public PhysicalProperties() {}
+    private DistributionSpec distributionSpec;
 
-    public DistributionSpec getDistributionDesc() {
-        return distributionDesc;
+    public PhysicalProperties() {
     }
 
-    public void setDistributionDesc(DistributionSpec distributionDesc) {
-        this.distributionDesc = distributionDesc;
+    public PhysicalProperties(DistributionSpec distributionSpec, OrderSpec orderSpec) {
+        this.distributionSpec = distributionSpec;
+        this.orderSpec = orderSpec;
+    }
+
+    public boolean meet(PhysicalProperties other) {
+        // TODO: handle distributionSpec meet()
+        return orderSpec.meet(other.orderSpec) && distributionSpec.meet(other.distributionSpec);
+    }
+
+
+    public OrderSpec getOrderSpec() {
+        return orderSpec;
+    }
+
+    public void setOrderSpec(OrderSpec orderSpec) {
+        this.orderSpec = orderSpec;
+    }
+
+    public DistributionSpec getDistributionSpec() {
+        return distributionSpec;
+    }
+
+    public void setDistributionSpec(DistributionSpec distributionSpec) {
+        this.distributionSpec = distributionSpec;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org