You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by hu...@apache.org on 2022/07/31 09:22:49 UTC

[doris] branch master updated: [feature](nereids): polish property deriver enforcer job (#11222)

This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e20e6624ab [feature](nereids): polish property deriver enforcer job (#11222)
e20e6624ab is described below

commit e20e6624ab07d1577f96d5b3694e43543894be05
Author: jakevin <ja...@gmail.com>
AuthorDate: Sun Jul 31 17:22:41 2022 +0800

    [feature](nereids): polish property deriver enforcer job (#11222)
    
    Polish property deriver enforcer job
---
 .../java/org/apache/doris/nereids/PlanContext.java |   5 +
 .../apache/doris/nereids/cost/CostCalculator.java  |  61 +++++++---
 .../nereids/jobs/cascades/CostAndEnforcerJob.java  |  81 ++++++++-----
 .../apache/doris/nereids/memo/GroupExpression.java |  12 +-
 .../properties/ChildOutputPropertyDeriver.java     | 106 +++++++++++++++++
 .../properties/ChildrenOutputPropertyDeriver.java  |  74 ------------
 .../doris/nereids/properties/DistributionSpec.java |  60 +++++-----
 ...tributionSpec.java => DistributionSpecAny.java} |  28 +++--
 ...butionDesc.java => DistributionSpecGather.java} |  12 +-
 .../nereids/properties/DistributionSpecHash.java   | 113 ++++++++++++++++++
 ...onSpec.java => DistributionSpecReplicated.java} |  11 +-
 .../properties/EnforceMissingPropertiesHelper.java |  19 +--
 .../apache/doris/nereids/properties/OrderSpec.java |  17 +++
 .../properties/ParentRequiredPropertyDeriver.java  |  58 ----------
 .../nereids/properties/PhysicalProperties.java     |  38 ++++--
 .../nereids/properties/RequestPropertyDeriver.java | 127 +++++++++++++++++++++
 .../rules/exploration/join/JoinLAsscom.java        |   3 +-
 .../rules/exploration/join/JoinProjectLAsscom.java |   5 +-
 .../apache/doris/nereids/trees/plans/JoinType.java |  26 +++--
 .../apache/doris/nereids/util/ExpressionUtils.java |  24 ----
 .../org/apache/doris/nereids/util/JoinUtils.java   | 123 ++++++++++++++++++++
 .../java/org/apache/doris/nereids/util/Utils.java  |  12 ++
 .../doris/nereids/jobs/CostAndEnforcerJobTest.java | 114 ++++++++++++++++++
 23 files changed, 841 insertions(+), 288 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
index 9f37171747..9d352d7027 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
@@ -19,6 +19,7 @@
 package org.apache.doris.nereids;
 
 import org.apache.doris.common.Id;
+import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Slot;
@@ -52,6 +53,10 @@ public class PlanContext {
 
     public PlanContext(GroupExpression groupExpression) {
         this.groupExpression = groupExpression;
+
+        for (Group group : groupExpression.children()) {
+            childrenStats.add(group.getStatistics());
+        }
     }
 
     public Plan getPlan() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
index 0950ed9643..055232bda1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
@@ -22,7 +22,9 @@ import org.apache.doris.nereids.PlanContext;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribution;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -62,17 +64,49 @@ public class CostCalculator {
         }
 
         @Override
-        public CostEstimate visitPhysicalAggregate(PhysicalAggregate<Plan> aggregate, PlanContext context) {
+        public CostEstimate visitPhysicalOlapScan(PhysicalOlapScan physicalOlapScan, PlanContext context) {
             StatsDeriveResult statistics = context.getStatisticsWithCheck();
             return CostEstimate.ofCpu(statistics.computeSize());
         }
 
         @Override
-        public CostEstimate visitPhysicalOlapScan(PhysicalOlapScan physicalOlapScan, PlanContext context) {
+        public CostEstimate visitPhysicalProject(PhysicalProject physicalProject, PlanContext context) {
             StatsDeriveResult statistics = context.getStatisticsWithCheck();
             return CostEstimate.ofCpu(statistics.computeSize());
         }
 
+        @Override
+        public CostEstimate visitPhysicalHeapSort(PhysicalHeapSort physicalHeapSort, PlanContext context) {
+            // TODO: consider two-phase sort and enforcer.
+            StatsDeriveResult statistics = context.getStatisticsWithCheck();
+            StatsDeriveResult childStatistics = context.getChildStatistics(0);
+
+            return new CostEstimate(
+                    childStatistics.computeSize(),
+                    statistics.computeSize(),
+                    childStatistics.computeSize());
+        }
+
+        @Override
+        public CostEstimate visitPhysicalDistribution(PhysicalDistribution physicalDistribution, PlanContext context) {
+            StatsDeriveResult statistics = context.getStatisticsWithCheck();
+            StatsDeriveResult childStatistics = context.getChildStatistics(0);
+
+            return new CostEstimate(
+                    childStatistics.computeSize(),
+                    statistics.computeSize(),
+                    childStatistics.computeSize());
+        }
+
+        @Override
+        public CostEstimate visitPhysicalAggregate(PhysicalAggregate<Plan> aggregate, PlanContext context) {
+            // TODO: stage.....
+
+            StatsDeriveResult statistics = context.getStatisticsWithCheck();
+            StatsDeriveResult inputStatistics = context.getChildStatistics(0);
+            return new CostEstimate(inputStatistics.computeSize(), statistics.computeSize(), 0);
+        }
+
         @Override
         public CostEstimate visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> physicalHashJoin, PlanContext context) {
             Preconditions.checkState(context.getGroupExpression().arity() == 2);
@@ -80,23 +114,24 @@ public class CostCalculator {
 
             StatsDeriveResult leftStatistics = context.getChildStatistics(0);
             StatsDeriveResult rightStatistics = context.getChildStatistics(1);
-
-            // TODO: handle some case
-
             List<Id> leftIds = context.getChildOutputIds(0);
             List<Id> rightIds = context.getChildOutputIds(1);
 
+            // TODO: handle some case
             // handle cross join, onClause is empty .....
-
+            if (physicalHashJoin.getJoinType().isCrossJoin()) {
+                return new CostEstimate(
+                        leftStatistics.computeColumnSize(leftIds) + rightStatistics.computeColumnSize(rightIds),
+                        rightStatistics.computeColumnSize(rightIds),
+                        0);
+            }
+
+            // TODO: network 0?
             return new CostEstimate(
-                    leftStatistics.computeColumnSize(leftIds) + rightStatistics.computeColumnSize(rightIds),
-                    rightStatistics.computeColumnSize(rightIds), 0);
+                    (leftStatistics.computeColumnSize(leftIds) + rightStatistics.computeColumnSize(rightIds)) / 2,
+                    rightStatistics.computeColumnSize(rightIds),
+                    0);
         }
 
-        @Override
-        public CostEstimate visitPhysicalProject(PhysicalProject physicalProject, PlanContext context) {
-            StatsDeriveResult statistics = context.getStatisticsWithCheck();
-            return CostEstimate.ofCpu(statistics.computeSize());
-        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index 520f219522..cb3f88cafe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -25,10 +25,10 @@ import org.apache.doris.nereids.jobs.JobContext;
 import org.apache.doris.nereids.jobs.JobType;
 import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
-import org.apache.doris.nereids.properties.ChildrenOutputPropertyDeriver;
+import org.apache.doris.nereids.properties.ChildOutputPropertyDeriver;
 import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper;
-import org.apache.doris.nereids.properties.ParentRequiredPropertyDeriver;
 import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.properties.RequestPropertyDeriver;
 
 import com.google.common.collect.Lists;
 
@@ -37,17 +37,18 @@ import java.util.Optional;
 
 /**
  * Job to compute cost and add enforcer.
+ * Inspired by NoisePage and ORCA-Paper.
  */
-public class CostAndEnforcerJob extends Job {
+public class CostAndEnforcerJob extends Job implements Cloneable {
     // GroupExpression to optimize
     private final GroupExpression groupExpression;
     // Current total cost
     private double curTotalCost;
 
-    // Properties from parent plan node.
-    // Like: Physical Hash Join
-    // [ [Properties ["", ANY], Properties ["", BROADCAST]],
-    //   [Properties ["", SHUFFLE_JOIN], Properties ["", SHUFFLE_JOIN]] ]
+    // Children properties from parent plan node.
+    // Example: Physical Hash Join
+    // [ [Properties {"", ANY}, Properties {"", BROADCAST}],
+    //   [Properties {"", SHUFFLE_JOIN}, Properties {"", SHUFFLE_JOIN}]]
     private List<List<PhysicalProperties>> propertiesListList;
 
     private List<GroupExpression> childrenBestGroupExprList;
@@ -78,17 +79,43 @@ public class CostAndEnforcerJob extends Job {
         }
     }
 
+    /*-
+     * Please read the ORCA paper
+     * - 4.1.4 Optimization.
+     * - Figure 7
+     *
+     *                currentJobSubPlanRoot
+     *             / ▲                     ▲ \
+     *   requested/ /childOutput childOutput\ \requested
+     * Properties/ /Properties     Properties\ \Properties
+     *          ▼ /                           \ ▼
+     *        child                           child
+     *
+     *
+     *         requestPropertyFromParent          parentPlanNode
+     *    ──►              │               ──►          ▲
+     *                     ▼                            │
+     *         requestPropertyToChildren        ChildOutputProperty
+     *
+     *         requestPropertyFromParent
+     *                     ┼
+     *    ──►             gap              ──►  add enforcer to fill the gap
+     *                     ┼
+     *            ChildOutputProperty
+     */
+
     /**
      * execute.
      */
+    //    @Override
     public void execute1() {
         // Do init logic of root plan/groupExpr of `subplan`, only run once per task.
-        if (curChildIndex != -1) {
+        if (curChildIndex == -1) {
             curTotalCost = 0;
 
             // Get property from groupExpression plan (it's root of subplan).
-            ParentRequiredPropertyDeriver parentRequiredPropertyDeriver = new ParentRequiredPropertyDeriver(context);
-            propertiesListList = parentRequiredPropertyDeriver.getRequiredPropertyListList(groupExpression);
+            RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(context);
+            propertiesListList = requestPropertyDeriver.getRequiredPropertyListList(groupExpression);
 
             curChildIndex = 0;
         }
@@ -135,7 +162,6 @@ public class CostAndEnforcerJob extends Job {
                 childrenInputProperties.set(curChildIndex, childOutputProperty);
 
                 // todo: check whether split agg broadcast row count limit.
-
                 curTotalCost += lowestCostExpr.getLowestCostTable().get(childInputProperties).first;
                 if (curTotalCost > context.getCostUpperBound()) {
                     break;
@@ -148,7 +174,7 @@ public class CostAndEnforcerJob extends Job {
                 // best expr from the child group
 
                 // TODO: it could update the cost.
-                PhysicalProperties outputProperty = ChildrenOutputPropertyDeriver.getProperties(
+                PhysicalProperties outputProperty = ChildOutputPropertyDeriver.getProperties(
                         context.getRequiredProperties(),
                         childrenOutputProperties, groupExpression);
 
@@ -161,10 +187,14 @@ public class CostAndEnforcerJob extends Job {
                     return;
                 }
                 PlanContext planContext = new PlanContext(groupExpression);
-                // TODO: calculate stats.
+                // TODO: calculate stats. ??????
                 groupExpression.getOwnerGroup().setStatistics(planContext.getStatistics());
 
                 enforce(outputProperty, childrenInputProperties);
+
+                if (curTotalCost < context.getCostUpperBound()) {
+                    context.setCostUpperBound(curTotalCost);
+                }
             }
 
             // Reset child idx and total cost
@@ -174,37 +204,33 @@ public class CostAndEnforcerJob extends Job {
         }
     }
 
-    private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> inputProperties) {
+    private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> childrenInputProperties) {
 
         // groupExpression can satisfy its own output property
-        putProperty(groupExpression, outputProperty, outputProperty, inputProperties);
+        putProperty(groupExpression, outputProperty, outputProperty, childrenInputProperties);
         // groupExpression can satisfy the ANY type output property
-        putProperty(groupExpression, outputProperty, new PhysicalProperties(), inputProperties);
+        putProperty(groupExpression, outputProperty, new PhysicalProperties(), childrenInputProperties);
 
         EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context,
                 groupExpression, curTotalCost);
 
-        PhysicalProperties requiredProperties = context.getRequiredProperties();
-        if (outputProperty.meet(requiredProperties)) {
+        PhysicalProperties requestedProperties = context.getRequiredProperties();
+        if (!outputProperty.meet(requestedProperties)) {
             Pair<PhysicalProperties, Double> pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty,
-                    requiredProperties);
+                    requestedProperties);
             PhysicalProperties addEnforcedProperty = pair.first;
             curTotalCost = pair.second;
 
             // enforcedProperty is superset of requiredProperty
-            if (!addEnforcedProperty.equals(requiredProperties)) {
+            if (!addEnforcedProperty.equals(requestedProperties)) {
                 putProperty(groupExpression.getOwnerGroup().getBestExpression(addEnforcedProperty),
-                        requiredProperties, requiredProperties, Lists.newArrayList(outputProperty));
+                        requestedProperties, requestedProperties, Lists.newArrayList(outputProperty));
             }
         } else {
-            if (!outputProperty.equals(requiredProperties)) {
-                putProperty(groupExpression, outputProperty, requiredProperties, inputProperties);
+            if (!outputProperty.equals(requestedProperties)) {
+                putProperty(groupExpression, outputProperty, requestedProperties, childrenInputProperties);
             }
         }
-
-        if (curTotalCost < context.getCostUpperBound()) {
-            context.setCostUpperBound(curTotalCost);
-        }
     }
 
     private void putProperty(GroupExpression groupExpression,
@@ -231,6 +257,7 @@ public class CostAndEnforcerJob extends Job {
         try {
             task = (CostAndEnforcerJob) super.clone();
         } catch (CloneNotSupportedException ignored) {
+            ignored.printStackTrace();
             return null;
         }
         return task;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index 27dbb0257d..d7a1508ace 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -136,19 +136,19 @@ public class GroupExpression {
     }
 
     /**
-     * Add a (parentOutputProperties) -> (cost, childrenInputProperties) in lowestCostTable.
+     * Add a (outputProperties) -> (cost, childrenInputProperties) in lowestCostTable.
      */
     public boolean updateLowestCostTable(
-            PhysicalProperties parentOutputProperties,
+            PhysicalProperties outputProperties,
             List<PhysicalProperties> childrenInputProperties,
             double cost) {
-        if (lowestCostTable.containsKey(parentOutputProperties)) {
-            if (lowestCostTable.get(parentOutputProperties).first > cost) {
-                lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+        if (lowestCostTable.containsKey(outputProperties)) {
+            if (lowestCostTable.get(outputProperties).first > cost) {
+                lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties));
                 return true;
             }
         } else {
-            lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+            lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties));
             return true;
         }
         return false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
new file mode 100644
index 0000000000..5f6b96d82d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+/**
+ * Used for property drive.
+ */
+public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, PlanContext> {
+    /*
+     *   parentPlanNode
+     *         ▲
+     *         │
+     * childOutputProperty
+     */
+    PhysicalProperties requestProperty;
+    List<PhysicalProperties> childrenOutputProperties;
+
+    public ChildOutputPropertyDeriver(PhysicalProperties requestProperty,
+            List<PhysicalProperties> childrenOutputProperties) {
+        this.childrenOutputProperties = childrenOutputProperties;
+        this.requestProperty = requestProperty;
+    }
+
+    public static PhysicalProperties getProperties(
+            PhysicalProperties requirements,
+            List<PhysicalProperties> childrenOutputProperties,
+            GroupExpression groupExpression) {
+
+        ChildOutputPropertyDeriver childOutputPropertyDeriver = new ChildOutputPropertyDeriver(requirements,
+                childrenOutputProperties);
+
+        return groupExpression.getPlan().accept(childOutputPropertyDeriver, new PlanContext(groupExpression));
+    }
+
+    public PhysicalProperties getRequestProperty() {
+        return requestProperty;
+    }
+
+    @Override
+    public PhysicalProperties visit(Plan plan, PlanContext context) {
+        return new PhysicalProperties();
+    }
+
+    @Override
+    public PhysicalProperties visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> hashJoin, PlanContext context) {
+        Preconditions.checkState(childrenOutputProperties.size() == 2);
+        PhysicalProperties leftOutputProperty = childrenOutputProperties.get(0);
+        PhysicalProperties rightOutputProperty = childrenOutputProperties.get(1);
+
+        // broadcast
+        if (rightOutputProperty.getDistributionSpec() instanceof DistributionSpecReplicated) {
+            // TODO
+            return leftOutputProperty;
+        }
+
+        // shuffle
+        // List<SlotReference> leftSlotRefs = hashJoin.left().getOutput().stream().map(slot -> (SlotReference) slot)
+        //        .collect(Collectors.toList());
+        // List<SlotReference> rightSlotRefs = hashJoin.right().getOutput().stream().map(slot -> (SlotReference) slot)
+        //                .collect(Collectors.toList());
+
+        //        List<SlotReference> leftOnSlotRefs;
+        //        List<SlotReference> rightOnSlotRefs;
+        //        Preconditions.checkState(leftOnSlotRefs.size() == rightOnSlotRefs.size());
+        DistributionSpec leftDistribution = leftOutputProperty.getDistributionSpec();
+        DistributionSpec rightDistribution = rightOutputProperty.getDistributionSpec();
+        if (!(leftDistribution instanceof DistributionSpecHash)
+                || !(rightDistribution instanceof DistributionSpecHash)) {
+            Preconditions.checkState(false, "error");
+            return new PhysicalProperties();
+        }
+
+        return leftOutputProperty;
+    }
+
+    @Override
+    public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanContext context) {
+        return olapScan.getPhysicalProperties();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
deleted file mode 100644
index 8b991fea1f..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.properties;
-
-import org.apache.doris.nereids.PlanContext;
-import org.apache.doris.nereids.memo.GroupExpression;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-
-import java.util.List;
-
-/**
- * Used for property drive.
- */
-public class ChildrenOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, PlanContext> {
-    PhysicalProperties requirements;
-    List<PhysicalProperties> childrenOutputProperties;
-
-    public ChildrenOutputPropertyDeriver(PhysicalProperties requirements,
-            List<PhysicalProperties> childrenOutputProperties) {
-        this.childrenOutputProperties = childrenOutputProperties;
-        this.requirements = requirements;
-    }
-
-    public static PhysicalProperties getProperties(
-            PhysicalProperties requirements,
-            List<PhysicalProperties> childrenOutputProperties,
-            GroupExpression groupExpression) {
-
-        ChildrenOutputPropertyDeriver childrenOutputPropertyDeriver = new ChildrenOutputPropertyDeriver(requirements,
-                childrenOutputProperties);
-
-        return groupExpression.getPlan().accept(childrenOutputPropertyDeriver, new PlanContext(groupExpression));
-    }
-
-    public PhysicalProperties getRequirements() {
-        return requirements;
-    }
-
-    //    public List<List<PhysicalProperties>> getProperties(GroupExpression groupExpression) {
-    //        properties = Lists.newArrayList();
-    //        groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
-    //        return properties;
-    //    }
-
-    //    @Override
-    //    public Void visit(Plan plan, PlanContext context) {
-    //        List<PhysicalProperties> props = Lists.newArrayList();
-    //        for (int childIndex = 0; childIndex < context.getGroupExpression().arity(); ++childIndex) {
-    //            props.add(new PhysicalProperties());
-    //        }
-    //        properties.add(props);
-    //        return null;
-    //    }
-    @Override
-    public PhysicalProperties visit(Plan plan, PlanContext context) {
-        return new PhysicalProperties();
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
index e849a1396f..d5568de293 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
@@ -21,52 +21,48 @@ import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribution;
-import org.apache.doris.planner.DataPartition;
 
 import com.google.common.collect.Lists;
 
 /**
  * Spec of data distribution.
+ * GPORCA has more type in CDistributionSpec.
  */
-public class DistributionSpec {
-
-    private DataPartition dataPartition;
-
-    // TODO: why exist?
-    public DistributionSpec() {
-    }
-
-    public DistributionSpec(DataPartition dataPartition) {
-        this.dataPartition = dataPartition;
-    }
-
+public abstract class DistributionSpec {
     /**
-     * TODO: need read ORCA.
-     * Whether other `DistributionSpec` is satisfied the current `DistributionSpec`.
-     *
-     * @param other another DistributionSpec.
+     * Self satisfies other DistributionSpec.
+     * Example:
+     * `DistributionSpecGather` satisfies `DistributionSpecAny`
      */
-    public boolean meet(DistributionSpec other) {
-        return false;
-    }
-
-    public DataPartition getDataPartition() {
-        return dataPartition;
-    }
-
-    public void setDataPartition(DataPartition dataPartition) {
-        this.dataPartition = dataPartition;
-    }
+    public abstract boolean satisfy(DistributionSpec other);
 
+    /**
+     * Add physical operator of enforcer.
+     */
     public GroupExpression addEnforcer(Group child) {
+        // TODO:maybe we need to new a LogicalProperties or just do not set logical properties for this node.
+        // If we don't set LogicalProperties explicitly, node will compute a applicable LogicalProperties for itself.
         PhysicalDistribution distribution = new PhysicalDistribution(
-                new DistributionSpec(dataPartition), child.getLogicalProperties(), new GroupPlan(child));
+                this,
+                child.getLogicalProperties(),
+                new GroupPlan(child));
         return new GroupExpression(distribution, Lists.newArrayList(child));
     }
 
-    // TODO
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        return true;
+    }
+
     @Override
-    public boolean equals(Object obj) {
-        return super.equals(obj);
+    public String toString() {
+        return this.getClass().toString();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
similarity index 65%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
index ade20758ff..0457b8204b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
@@ -17,25 +17,23 @@
 
 package org.apache.doris.nereids.properties;
 
-import org.apache.doris.analysis.HashDistributionDesc;
-
 /**
- * Describe hash distribution.
+ * Data can be anywhere on the segments (required only).
  */
-public class HashDistributionSpec extends DistributionSpec {
+public class DistributionSpecAny extends DistributionSpec {
 
-    /**
-     * Enums for concrete shuffle type.
-     */
-    public enum ShuffleType {
-        COLOCATE,
-        BUCKET,
-        AGG,
-        NORMAL
-    }
+    private static DistributionSpecAny instance = new DistributionSpecAny();
 
-    private ShuffleType shuffleType;
+    private DistributionSpecAny() {
+        super();
+    }
 
-    private HashDistributionDesc hashDistributionDesc;
+    public static DistributionSpecAny getInstance() {
+        return instance;
+    }
 
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        return other instanceof DistributionSpecAny;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
similarity index 70%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
index f54ed12a19..709a582e0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
@@ -18,8 +18,16 @@
 package org.apache.doris.nereids.properties;
 
 /**
- * Describe random distribution.
+ * Gather distribution which put all data into one node.
  */
-public class RandomDistributionDesc extends DistributionSpec {
+public class DistributionSpecGather extends DistributionSpec {
 
+    public DistributionSpecGather() {
+        super();
+    }
+
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        return other instanceof DistributionSpecGather || other instanceof DistributionSpecAny;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
new file mode 100644
index 0000000000..fb7cfe22df
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.annotation.Developing;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+
+import java.util.HashSet;
+import java.util.List;
+
+
+/**
+ * Describe hash distribution.
+ */
+@Developing
+public class DistributionSpecHash extends DistributionSpec {
+
+    private final List<SlotReference> shuffledColumns;
+
+    private final ShuffleType shuffleType;
+
+    public DistributionSpecHash(List<SlotReference> shuffledColumns, ShuffleType shuffleType) {
+        // Preconditions.checkState(!shuffledColumns.isEmpty());
+        this.shuffledColumns = shuffledColumns;
+        this.shuffleType = shuffleType;
+    }
+
+    public List<SlotReference> getShuffledColumns() {
+        return shuffledColumns;
+    }
+
+    public ShuffleType getShuffleType() {
+        return shuffleType;
+    }
+
+
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        if (other instanceof DistributionSpecAny) {
+            return true;
+        }
+
+        if (!(other instanceof DistributionSpecHash)) {
+            return false;
+        }
+
+        DistributionSpecHash spec = (DistributionSpecHash) other;
+
+        if (shuffledColumns.size() > spec.shuffledColumns.size()) {
+            return false;
+        }
+
+        // TODO: need consider following logic whether is right, and maybe need consider more.
+
+        // Current shuffleType is LOCAL/AGG, allow if current is contained by other
+        if (shuffleType == ShuffleType.LOCAL && spec.shuffleType == ShuffleType.AGG) {
+            return new HashSet<>(spec.shuffledColumns).containsAll(shuffledColumns);
+        }
+
+        if (shuffleType == ShuffleType.AGG && spec.shuffleType == ShuffleType.JOIN) {
+            return shuffledColumns.size() == spec.shuffledColumns.size()
+                    && shuffledColumns.equals(spec.shuffledColumns);
+        } else if (shuffleType == ShuffleType.JOIN && spec.shuffleType == ShuffleType.AGG) {
+            return new HashSet<>(spec.shuffledColumns).containsAll(shuffledColumns);
+        }
+
+        if (!shuffleType.equals(spec.shuffleType)) {
+            return false;
+        }
+
+        return shuffledColumns.size() == spec.shuffledColumns.size()
+                && shuffledColumns.equals(spec.shuffledColumns);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!super.equals(o)) {
+            return false;
+        }
+        DistributionSpecHash that = (DistributionSpecHash) o;
+        return shuffledColumns.equals(that.shuffledColumns)
+                && shuffleType.equals(that.shuffleType);
+        // && propertyInfo.equals(that.propertyInfo)
+    }
+
+    /**
+     * Enums for concrete shuffle type.
+     */
+    public enum ShuffleType {
+        LOCAL,
+        BUCKET,
+        // Shuffle Aggregation
+        AGG,
+        // Shuffle Join
+        JOIN,
+        ENFORCE
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
similarity index 74%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
index e197ade368..850dc11995 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
@@ -18,11 +18,12 @@
 package org.apache.doris.nereids.properties;
 
 /**
- * Re-shuffle.
+ * Data is replicated across all segments.
+ * Like: broadcast join.
  */
-public class GatherDistributionSpec extends DistributionSpec {
-
-    public GatherDistributionSpec() {
-        super();
+public class DistributionSpecReplicated extends DistributionSpec {
+    @Override
+    public boolean satisfy(DistributionSpec other) {
+        return other instanceof DistributionSpecReplicated || other instanceof DistributionSpecAny;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
index 54d26f5ca6..73d23cbb27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
@@ -47,13 +47,17 @@ public class EnforceMissingPropertiesHelper {
      */
     public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties output, PhysicalProperties required) {
         boolean isMeetOrder = output.getOrderSpec().meet(required.getOrderSpec());
-        boolean isMeetDistribution = output.getDistributionSpec().meet(required.getDistributionSpec());
+        boolean isMeetDistribution = output.getDistributionSpec().satisfy(required.getDistributionSpec());
 
         if (!isMeetDistribution && !isMeetOrder) {
+            // Both Distribution and Order don't satisfy.
             return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost);
         } else if (isMeetDistribution && isMeetOrder) {
+            // Both satisfy.
+            // TODO: can't reach here.
             return new Pair<>(null, curTotalCost);
         } else if (!isMeetDistribution) {
+            // Distribution don't satisfy.
             if (required.getOrderSpec().getOrderKeys().isEmpty()) {
                 return new Pair<>(enforceDistribution(output), curTotalCost);
             } else {
@@ -67,15 +71,16 @@ public class EnforceMissingPropertiesHelper {
                 return new Pair<>(enforceDistribution(output), curTotalCost);
             }
         } else {
+            // Order don't satisfy.
             return new Pair<>(enforceSort(output), curTotalCost);
         }
     }
 
     private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) {
         // clone
-        PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
-                oldOutputProperty.getOrderSpec());
-        newOutputProperty.setOrderSpec(context.getRequiredProperties().getOrderSpec());
+        PhysicalProperties newOutputProperty = new PhysicalProperties(
+                oldOutputProperty.getDistributionSpec(),
+                context.getRequiredProperties().getOrderSpec());
         GroupExpression enforcer =
                 context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getOwnerGroup());
 
@@ -85,9 +90,9 @@ public class EnforceMissingPropertiesHelper {
     }
 
     private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputProperty) {
-        PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
+        PhysicalProperties newOutputProperty = new PhysicalProperties(
+                context.getRequiredProperties().getDistributionSpec(),
                 oldOutputProperty.getOrderSpec());
-        newOutputProperty.setDistributionSpec(context.getRequiredProperties().getDistributionSpec());
         GroupExpression enforcer =
                 context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getOwnerGroup());
 
@@ -112,7 +117,7 @@ public class EnforceMissingPropertiesHelper {
             PhysicalProperties requiredProperty) {
         PhysicalProperties enforcedProperty;
         if (requiredProperty.getDistributionSpec()
-                .equals(new GatherDistributionSpec())) {
+                .equals(new DistributionSpecGather())) {
             enforcedProperty = enforceSort(outputProperty);
             enforcedProperty = enforceDistribution(enforcedProperty);
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
index 196bbf0cc6..8b0f38aa0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
@@ -32,6 +32,10 @@ import java.util.List;
 public class OrderSpec {
     private final List<OrderKey> orderKeys;
 
+    public OrderSpec() {
+        this.orderKeys = Lists.newArrayList();
+    }
+
     public OrderSpec(List<OrderKey> orderKeys) {
         this.orderKeys = orderKeys;
     }
@@ -45,6 +49,7 @@ public class OrderSpec {
         if (this.orderKeys.size() < other.getOrderKeys().size()) {
             return false;
         }
+
         for (int i = 0; i < other.getOrderKeys().size(); ++i) {
             if (!this.orderKeys.get(i).matches(other.getOrderKeys().get(i))) {
                 return false;
@@ -63,4 +68,16 @@ public class OrderSpec {
     public List<OrderKey> getOrderKeys() {
         return orderKeys;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OrderSpec that = (OrderSpec) o;
+        return orderKeys.equals(that.orderKeys);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
deleted file mode 100644
index f78fd2c601..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.properties;
-
-import org.apache.doris.nereids.PlanContext;
-import org.apache.doris.nereids.jobs.JobContext;
-import org.apache.doris.nereids.memo.GroupExpression;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-/**
- * Used for parent property drive.
- */
-public class ParentRequiredPropertyDeriver extends PlanVisitor<Void, PlanContext> {
-
-    PhysicalProperties requestPropertyFromParent;
-    List<List<PhysicalProperties>> requiredPropertyListList;
-
-    public ParentRequiredPropertyDeriver(JobContext context) {
-        this.requestPropertyFromParent = context.getRequiredProperties();
-    }
-
-    public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) {
-        requiredPropertyListList = Lists.newArrayList();
-        groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
-        return requiredPropertyListList;
-    }
-
-    @Override
-    public Void visit(Plan plan, PlanContext context) {
-        List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
-        for (int i = 0; i < context.getGroupExpression().arity(); i++) {
-            requiredPropertyList.add(new PhysicalProperties());
-        }
-        requiredPropertyListList.add(requiredPropertyList);
-        return null;
-    }
-
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 75b02d7945..feb448b33b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -22,11 +22,23 @@ package org.apache.doris.nereids.properties;
  * TODO(wj): Do we need to `PhysicalPropertySpec` Interface like NoisePage?
  */
 public class PhysicalProperties {
-    private OrderSpec orderSpec;
+    private final OrderSpec orderSpec;
 
-    private DistributionSpec distributionSpec;
+    private final DistributionSpec distributionSpec;
 
     public PhysicalProperties() {
+        this.orderSpec = new OrderSpec();
+        this.distributionSpec = DistributionSpecAny.getInstance();
+    }
+
+    public PhysicalProperties(DistributionSpec distributionSpec) {
+        this.distributionSpec = distributionSpec;
+        this.orderSpec = new OrderSpec();
+    }
+
+    public PhysicalProperties(OrderSpec orderSpec) {
+        this.orderSpec = orderSpec;
+        this.distributionSpec = DistributionSpecAny.getInstance();
     }
 
     public PhysicalProperties(DistributionSpec distributionSpec, OrderSpec orderSpec) {
@@ -34,25 +46,29 @@ public class PhysicalProperties {
         this.orderSpec = orderSpec;
     }
 
+    // Current properties satisfies other properties.
     public boolean meet(PhysicalProperties other) {
-        // TODO: handle distributionSpec meet()
-        return orderSpec.meet(other.orderSpec) && distributionSpec.meet(other.distributionSpec);
+        return orderSpec.meet(other.orderSpec) && distributionSpec.satisfy(other.distributionSpec);
     }
 
-
     public OrderSpec getOrderSpec() {
         return orderSpec;
     }
 
-    public void setOrderSpec(OrderSpec orderSpec) {
-        this.orderSpec = orderSpec;
-    }
-
     public DistributionSpec getDistributionSpec() {
         return distributionSpec;
     }
 
-    public void setDistributionSpec(DistributionSpec distributionSpec) {
-        this.distributionSpec = distributionSpec;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PhysicalProperties that = (PhysicalProperties) o;
+        return orderSpec.equals(that.orderSpec)
+                && distributionSpec.equals(that.distributionSpec);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
new file mode 100644
index 0000000000..848ed44f9d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Used for parent property drive.
+ */
+public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
+    /*
+     * requestPropertyFromParent
+     *             │
+     *             ▼
+     *          curNode (current plan node in current CostAndEnforcerJob)
+     *             │
+     *             ▼
+     * requestPropertyToChildren
+     */
+    private PhysicalProperties requestPropertyFromParent;
+    private List<List<PhysicalProperties>> requestPropertyToChildren;
+
+    public RequestPropertyDeriver(JobContext context) {
+        this.requestPropertyFromParent = context.getRequiredProperties();
+    }
+
+    public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) {
+        requestPropertyToChildren = Lists.newArrayList();
+        groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
+        return requestPropertyToChildren;
+    }
+
+    @Override
+    public Void visit(Plan plan, PlanContext context) {
+        List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
+        for (int i = 0; i < context.getGroupExpression().arity(); i++) {
+            requiredPropertyList.add(new PhysicalProperties());
+        }
+        requestPropertyToChildren.add(requiredPropertyList);
+        return null;
+    }
+
+    @Override
+    public Void visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> hashJoin, PlanContext context) {
+        // for broadcast join
+        List<PhysicalProperties> propertiesForBroadcast = Lists.newArrayList(
+                new PhysicalProperties(),
+                new PhysicalProperties(new DistributionSpecReplicated())
+        );
+        // for shuffle join
+        Pair<List<SlotReference>, List<SlotReference>> onClauseUsedSlots = JoinUtils.getOnClauseUsedSlots(hashJoin);
+        List<PhysicalProperties> propertiesForShuffle = Lists.newArrayList(
+                new PhysicalProperties(new DistributionSpecHash(onClauseUsedSlots.first, ShuffleType.JOIN)),
+                new PhysicalProperties(new DistributionSpecHash(onClauseUsedSlots.second, ShuffleType.JOIN)));
+
+        if (!JoinUtils.onlyBroadcast(hashJoin)) {
+            requestPropertyToChildren.add(propertiesForShuffle);
+        }
+        if (!JoinUtils.onlyShuffle(hashJoin)) {
+            requestPropertyToChildren.add(propertiesForBroadcast);
+        }
+
+        return null;
+    }
+
+    protected static List<PhysicalProperties> computeShuffleJoinRequiredProperties(
+            PhysicalProperties requestedProperty, List<SlotReference> leftShuffleColumns,
+            List<SlotReference> rightShuffleColumns) {
+
+        // requestedProperty type isn't SHUFFLE_JOIN,
+        if (!(requestedProperty.getDistributionSpec() instanceof DistributionSpecHash
+                && ((DistributionSpecHash) requestedProperty.getDistributionSpec()).getShuffleType()
+                == ShuffleType.JOIN)) {
+            return Lists.newArrayList(
+                    new PhysicalProperties(new DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+                    new PhysicalProperties(new DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+        }
+
+        // adjust the required property shuffle columns based on the column order required by parent
+        DistributionSpecHash distributionSpec = (DistributionSpecHash) requestedProperty.getDistributionSpec();
+        List<SlotReference> requestedColumns = distributionSpec.getShuffledColumns();
+
+        boolean adjustBasedOnLeft = Utils.equalsIgnoreOrder(leftShuffleColumns, requestedColumns);
+        boolean adjustBasedOnRight = Utils.equalsIgnoreOrder(rightShuffleColumns, requestedColumns);
+        if (!adjustBasedOnLeft && !adjustBasedOnRight) {
+            return Lists.newArrayList(
+                    new PhysicalProperties(new DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+                    new PhysicalProperties(new DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+        }
+
+        return Lists.newArrayList(
+                new PhysicalProperties(new DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+                new PhysicalProperties(new DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+
+    }
+
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java
index 031ac28c91..26fce95acc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java
@@ -31,6 +31,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -107,7 +108,7 @@ public class JoinLAsscom extends OneExplorationRuleFactory {
             List<Expression> newTopJoinOnCondition = Lists.newArrayList();
             for (Expression onCondition : allOnCondition) {
                 List<SlotReference> slots = onCondition.collect(SlotReference.class::isInstance);
-                if (ExpressionUtils.containsAll(newBottomJoinSlots, slots)) {
+                if (new HashSet<>(newBottomJoinSlots).containsAll(slots)) {
                     newBottomJoinOnCondition.add(onCondition);
                 } else {
                     newTopJoinOnCondition.add(onCondition);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java
index 7f5500d4cc..74dfa353cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java
@@ -33,6 +33,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -115,7 +116,7 @@ public class JoinProjectLAsscom extends OneExplorationRuleFactory {
                 List<Expression> newTopJoinOnCondition = Lists.newArrayList();
                 for (Expression onCondition : allOnCondition) {
                     List<SlotReference> slots = onCondition.collect(SlotReference.class::isInstance);
-                    if (ExpressionUtils.containsAll(newBottomJoinSlots, slots)) {
+                    if (new HashSet<>(newBottomJoinSlots).containsAll(slots)) {
                         newBottomJoinOnCondition.add(onCondition);
                     } else {
                         newTopJoinOnCondition.add(onCondition);
@@ -143,7 +144,7 @@ public class JoinProjectLAsscom extends OneExplorationRuleFactory {
                 List<NamedExpression> newLeftProjectExpr = Lists.newArrayList();
                 for (NamedExpression projectExpr : projectExprs) {
                     List<SlotReference> usedSlotRefs = projectExpr.collect(SlotReference.class::isInstance);
-                    if (ExpressionUtils.containsAll(bOutputSlots, usedSlotRefs)) {
+                    if (new HashSet<>(bOutputSlots).containsAll(usedSlotRefs)) {
                         newRightProjectExprs.add(projectExpr);
                     } else {
                         newLeftProjectExpr.add(projectExpr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
index f53edae272..77b7c447dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
@@ -88,27 +88,31 @@ public enum JoinType {
         return this == CROSS_JOIN;
     }
 
-    public final boolean isInnerOrOuterOrCrossJoin() {
-        return this == INNER_JOIN || this == CROSS_JOIN || this == FULL_OUTER_JOIN;
+    public final boolean isInnerJoin() {
+        return this == INNER_JOIN;
     }
 
-    public final boolean isSwapJoinType() {
-        return joinSwapMap.containsKey(this);
+    public final boolean isRightJoin() {
+        return this == RIGHT_OUTER_JOIN;
     }
 
-    public JoinType swap() {
-        return joinSwapMap.get(this);
+    public final boolean isFullOuterJoin() {
+        return this == FULL_OUTER_JOIN;
     }
 
-    public boolean isSemiOrAntiJoin() {
+    public final boolean isSemiOrAntiJoin() {
         return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
     }
 
-    public boolean isInnerJoin() {
-        return this == INNER_JOIN;
+    public final boolean isOuterJoin() {
+        return this == LEFT_OUTER_JOIN || this == RIGHT_OUTER_JOIN || this == FULL_OUTER_JOIN;
     }
 
-    public boolean isOuterJoin() {
-        return this == LEFT_OUTER_JOIN || this == RIGHT_OUTER_JOIN || this == FULL_OUTER_JOIN;
+    public final boolean isSwapJoinType() {
+        return joinSwapMap.containsKey(this);
+    }
+
+    public JoinType swap() {
+        return joinSwapMap.get(this);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
index 3641fd7a4c..89218c25f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
@@ -135,28 +135,4 @@ public class ExpressionUtils {
         }
         return false;
     }
-
-    /**
-     * Whether `List of SlotReference` contains a `SlotReference`.
-     */
-    public static boolean contains(List<SlotReference> list, SlotReference item) {
-        for (SlotReference slotRefInList : list) {
-            if (item.equals(slotRefInList)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Whether `List of SlotReference` contains all another `List of SlotReference`.
-     */
-    public static boolean containsAll(List<SlotReference> large, List<SlotReference> small) {
-        for (SlotReference slotRefInSmall : small) {
-            if (!contains(large, slotRefInSmall)) {
-                return false;
-            }
-        }
-        return true;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
new file mode 100644
index 0000000000..04ccafcaee
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.util;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utils for join
+ */
+public class JoinUtils {
+    public static boolean onlyBroadcast(PhysicalHashJoin join) {
+        // Cross-join only can be broadcast join.
+        return join.getJoinType().isCrossJoin();
+    }
+
+    public static boolean onlyShuffle(PhysicalHashJoin join) {
+        return join.getJoinType().isRightJoin() || join.getJoinType().isFullOuterJoin();
+    }
+
+    /**
+     * Get all equalTo from onClause of join
+     */
+    public static List<EqualTo> getEqualTo(PhysicalHashJoin<Plan, Plan> join) {
+        List<EqualTo> eqConjuncts = Lists.newArrayList();
+        if (!join.getCondition().isPresent()) {
+            return eqConjuncts;
+        }
+
+        List<SlotReference> leftSlots = join.left().getOutput().stream().map(slot -> (SlotReference) slot)
+                .collect(Collectors.toList());
+        List<SlotReference> rightSlots = join.right().getOutput().stream().map(slot -> (SlotReference) slot)
+                .collect(Collectors.toList());
+
+        Expression onCondition = join.getCondition().get();
+        List<Expression> conjunctList = ExpressionUtils.extractConjunctive(onCondition);
+        for (Expression predicate : conjunctList) {
+            if (isEqualTo(leftSlots, rightSlots, predicate)) {
+                eqConjuncts.add((EqualTo) predicate);
+            }
+        }
+        return eqConjuncts;
+    }
+
+    private static boolean isEqualTo(List<SlotReference> leftSlots, List<SlotReference> rightSlots,
+            Expression predicate) {
+        if (!(predicate instanceof EqualTo)) {
+            return false;
+        }
+
+        EqualTo equalTo = (EqualTo) predicate;
+        List<SlotReference> leftUsed = equalTo.left().collect(SlotReference.class::isInstance);
+        List<SlotReference> rightUsed = equalTo.right().collect(SlotReference.class::isInstance);
+        if (leftUsed.isEmpty() || rightUsed.isEmpty()) {
+            return false;
+        }
+
+        return Utils.equalsIgnoreOrder(leftUsed, leftSlots) || Utils.equalsIgnoreOrder(rightUsed, rightSlots);
+    }
+
+    /**
+     * Get all used slots from onClause of join.
+     * Return pair of left used slots and right used slots.
+     */
+    public static Pair<List<SlotReference>, List<SlotReference>> getOnClauseUsedSlots(
+            PhysicalHashJoin<Plan, Plan> join) {
+        Pair<List<SlotReference>, List<SlotReference>> childSlots =
+                new Pair<>(Lists.newArrayList(), Lists.newArrayList());
+
+        List<SlotReference> leftSlots = join.left().getOutput().stream().map(slot -> (SlotReference) slot)
+                .collect(Collectors.toList());
+        List<SlotReference> rightSlots = join.right().getOutput().stream().map(slot -> (SlotReference) slot)
+                .collect(Collectors.toList());
+        List<EqualTo> equalToList = getEqualTo(join);
+
+
+        for (EqualTo equalTo : equalToList) {
+            List<SlotReference> leftOnSlots = equalTo.left().collect(SlotReference.class::isInstance);
+            List<SlotReference> rightOnSlots = equalTo.right().collect(SlotReference.class::isInstance);
+
+            if (new HashSet<>(leftSlots).containsAll(leftOnSlots)
+                    && new HashSet<>(rightSlots).containsAll(rightOnSlots)) {
+                // TODO: need rethink about `.get(0)`
+                childSlots.first.add(leftOnSlots.get(0));
+                childSlots.second.add(rightOnSlots.get(0));
+            } else if (new HashSet<>(leftSlots).containsAll(rightOnSlots)
+                    && new HashSet<>(rightSlots).containsAll(leftOnSlots)) {
+                childSlots.first.add(rightOnSlots.get(0));
+                childSlots.second.add(leftOnSlots.get(0));
+            } else {
+                Preconditions.checkState(false, "error");
+            }
+        }
+
+        return childSlots;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
index 2500283e44..a5bc161570 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.util;
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.HashSet;
 import java.util.List;
 
 /**
@@ -95,4 +96,15 @@ public class Utils {
     public static String qualifiedName(List<String> qualifier, String name) {
         return StringUtils.join(qualifiedNameParts(qualifier, name), ".");
     }
+
+
+    /**
+     * equals for List but ignore order.
+     */
+    public static <E> boolean equalsIgnoreOrder(List<E> one, List<E> other) {
+        if (one.size() != other.size()) {
+            return false;
+        }
+        return new HashSet<>(one).containsAll(other) && new HashSet<>(other).containsAll(one);
+    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
new file mode 100644
index 0000000000..d96f42b054
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.nereids.PlannerContext;
+import org.apache.doris.nereids.cost.CostCalculator;
+import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.memo.Memo;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+public class CostAndEnforcerJobTest {
+    /*
+     *   topJoin
+     *   /     \
+     *  C   bottomJoin
+     *        /    \
+     *       A      B
+     */
+    @Test
+    public void testExecute(@Mocked LogicalProperties logicalProperties) {
+        new MockUp<CostCalculator>() {
+            @Mock
+            public double calculateCost(GroupExpression groupExpression) {
+                return 0;
+            }
+        };
+
+        OlapTable aOlapTable = new OlapTable(0L, "a",
+                ImmutableList.of(new Column("id", Type.INT, true, AggregateType.NONE, "0", ""),
+                        new Column("name", Type.STRING, true, AggregateType.NONE, "", "")),
+                KeysType.PRIMARY_KEYS,
+                null, null);
+        OlapTable bOlapTable = new OlapTable(0L, "b",
+                ImmutableList.of(new Column("id", Type.INT, true, AggregateType.NONE, "0", ""),
+                        new Column("name", Type.STRING, true, AggregateType.NONE, "", "")),
+                KeysType.PRIMARY_KEYS,
+                null, null);
+        PhysicalOlapScan aScan = new PhysicalOlapScan(aOlapTable, Lists.newArrayList("a"), Optional.empty(),
+                logicalProperties);
+        PhysicalOlapScan bScan = new PhysicalOlapScan(bOlapTable, Lists.newArrayList("b"), Optional.empty(),
+                logicalProperties);
+
+
+        OlapTable cOlapTable = new OlapTable(0L, "c",
+                ImmutableList.of(new Column("id", Type.INT, true, AggregateType.NONE, "0", ""),
+                        new Column("name", Type.STRING, true, AggregateType.NONE, "", "")),
+                KeysType.PRIMARY_KEYS,
+                null, null);
+        PhysicalPlan cScan = new PhysicalOlapScan(cOlapTable, Lists.newArrayList("c"), Optional.empty(),
+                logicalProperties);
+
+        Expression bottomJoinOnCondition = new EqualTo(
+                new SlotReference("id", new IntegerType(), true, ImmutableList.of("a")),
+                new SlotReference("id", new IntegerType(), true, ImmutableList.of("b")));
+        Expression topJoinOnCondition = new EqualTo(
+                new SlotReference("id", new IntegerType(), true, ImmutableList.of("a")),
+                new SlotReference("id", new IntegerType(), true, ImmutableList.of("c")));
+
+        PhysicalHashJoin bottomJoin = new PhysicalHashJoin<>(JoinType.INNER_JOIN,
+                Optional.of(bottomJoinOnCondition),
+                logicalProperties, aScan, bScan);
+        PhysicalHashJoin topJoin = new PhysicalHashJoin<>(JoinType.INNER_JOIN,
+                Optional.of(topJoinOnCondition),
+                logicalProperties, cScan, bottomJoin);
+
+
+        PlannerContext plannerContext = new Memo(topJoin).newPlannerContext(new ConnectContext())
+                .setDefaultJobContext();
+
+        OptimizeGroupJob optimizeGroupJob = new OptimizeGroupJob(plannerContext.getMemo().getRoot(),
+                plannerContext.getCurrentJobContext());
+        plannerContext.pushJob(optimizeGroupJob);
+        plannerContext.getJobScheduler().executeJobPool(plannerContext);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org