You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/30 12:07:56 UTC

[doris] branch master updated: [feature-wip](nereids) Adjust plan execution flow and fix physical bugs (#10481)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aae619ef2c [feature-wip](nereids) Adjust plan execution flow and fix physical bugs (#10481)
aae619ef2c is described below

commit aae619ef2c21a64df957585ea7d290bfc59fc3b4
Author: zhengshiJ <32...@users.noreply.github.com>
AuthorDate: Thu Jun 30 20:07:48 2022 +0800

    [feature-wip](nereids) Adjust plan execution flow and fix physical bugs (#10481)
    
    Organize the plan process, improve the batch execution of rules and the way to add jobs.
    Fix the problem that the condition in PhysicalHashJoin is empty.
---
 ...calJoinToHashJoin.java => AnalyzeRulesJob.java} | 29 ++++----
 .../org/apache/doris/nereids/BatchRulesJob.java    | 82 ++++++++++++++++++++++
 .../org/apache/doris/nereids/NereidsPlanner.java   | 20 ++++--
 ...alJoinToHashJoin.java => OptimizeRulesJob.java} | 23 +++---
 .../operators/plans/physical/PhysicalHashJoin.java | 14 ++--
 .../org/apache/doris/nereids/rules/RuleSet.java    | 16 ++---
 .../implementation/LogicalJoinToHashJoin.java      |  2 +-
 .../trees/plans/PhysicalPlanTranslator.java        |  2 +-
 8 files changed, 134 insertions(+), 54 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/AnalyzeRulesJob.java
similarity index 52%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/AnalyzeRulesJob.java
index 2683e0fcb6..1f7a01708f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/AnalyzeRulesJob.java
@@ -15,23 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.rules.implementation;
+package org.apache.doris.nereids;
 
-import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.rules.analysis.BindRelation;
+import org.apache.doris.nereids.rules.analysis.BindSlotReference;
+
+import com.google.common.collect.ImmutableList;
 
 /**
- * Implementation rule that convert logical join to physical hash join.
+ * Execute the analyze job.
  */
-public class LogicalJoinToHashJoin extends OneImplementationRuleFactory {
-    @Override
-    public Rule<Plan> build() {
-        return logicalJoin().then(join -> plan(
-            new PhysicalHashJoin(join.operator.getJoinType(), join.operator.getCondition().get()),
-            join.getLogicalProperties(),
-            join.left(), join.right()
-        )).toRule(RuleType.LOGICAL_JOIN_TO_HASH_JOIN_RULE);
+public class AnalyzeRulesJob extends BatchRulesJob {
+
+    AnalyzeRulesJob(PlannerContext plannerContext) {
+        super(plannerContext);
+        rulesJob.addAll(ImmutableList.of(
+                bottomUpBatch(ImmutableList.of(
+                        new BindRelation(),
+                        new BindSlotReference())
+                )));
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/BatchRulesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/BatchRulesJob.java
new file mode 100644
index 0000000000..bc68d92fe9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/BatchRulesJob.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.jobs.Job;
+import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
+import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
+import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleFactory;
+import org.apache.doris.nereids.trees.plans.Plan;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for executing all jobs.
+ *
+ * Each batch of rules will be uniformly executed.
+ */
+public class BatchRulesJob {
+    protected PlannerContext plannerContext;
+    protected List<Job<Plan>> rulesJob = new ArrayList<>();
+
+    BatchRulesJob(PlannerContext plannerContext) {
+        this.plannerContext = Objects.requireNonNull(plannerContext, "plannerContext can not null");
+    }
+
+    protected Job<Plan> bottomUpBatch(List<RuleFactory> ruleFactories) {
+        List<Rule<Plan>> rules = new ArrayList<>();
+        for (RuleFactory ruleFactory : ruleFactories) {
+            rules.add((Rule<Plan>) ruleFactory.buildRules());
+        }
+        Collections.reverse(rules);
+        return new RewriteBottomUpJob(
+                plannerContext.getOptimizerContext().getMemo().getRoot(),
+                rules,
+                plannerContext);
+    }
+
+    protected Job<Plan> topDownBatch(List<RuleFactory> ruleFactories) {
+        List<Rule<Plan>> rules = new ArrayList<>();
+        for (RuleFactory ruleFactory : ruleFactories) {
+            rules.add((Rule<Plan>) ruleFactory.buildRules());
+        }
+        Collections.reverse(rules);
+        return new RewriteTopDownJob(
+                plannerContext.getOptimizerContext().getMemo().getRoot(),
+                rules,
+                plannerContext);
+    }
+
+    protected Job<Plan> optimize() {
+        return new OptimizeGroupJob(
+                plannerContext.getOptimizerContext().getMemo().getRoot(),
+                plannerContext);
+    }
+
+    public void execute() {
+        for (Job job : rulesJob) {
+            plannerContext.getOptimizerContext().pushJob(job);
+            plannerContext.getOptimizerContext().getJobScheduler().executeJobPool(plannerContext);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index ee8b9edc16..68797c57e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -21,8 +21,6 @@ import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
-import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
-import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
 import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.memo.Memo;
@@ -94,13 +92,21 @@ public class NereidsPlanner extends Planner {
         OptimizerContext optimizerContext = new OptimizerContext(memo);
         plannerContext = new PlannerContext(optimizerContext, connectContext, outputProperties);
 
-        plannerContext.getOptimizerContext().pushJob(
-                new RewriteBottomUpJob(getRoot(), optimizerContext.getRuleSet().getAnalysisRules(), plannerContext));
+        // Get plan directly. Just for SSB.
+        return doPlan();
+    }
+
+    /**
+     * The actual execution of the plan, including the generation and execution of the job.
+     * @return PhysicalPlan.
+     */
+    private PhysicalPlan doPlan() {
+        AnalyzeRulesJob analyzeRulesJob = new AnalyzeRulesJob(plannerContext);
+        analyzeRulesJob.execute();
 
-        plannerContext.getOptimizerContext().pushJob(new OptimizeGroupJob(getRoot(), plannerContext));
-        plannerContext.getOptimizerContext().getJobScheduler().executeJobPool(plannerContext);
+        OptimizeRulesJob optimizeRulesJob = new OptimizeRulesJob(plannerContext);
+        optimizeRulesJob.execute();
 
-        // Get plan directly. Just for SSB.
         return getRoot().extractPlan();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/OptimizeRulesJob.java
similarity index 52%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/OptimizeRulesJob.java
index 2683e0fcb6..80ed619a16 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/OptimizeRulesJob.java
@@ -15,23 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.rules.implementation;
+package org.apache.doris.nereids;
 
-import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.plans.Plan;
+import com.google.common.collect.ImmutableList;
 
 /**
- * Implementation rule that convert logical join to physical hash join.
+ * cascade optimizer added.
  */
-public class LogicalJoinToHashJoin extends OneImplementationRuleFactory {
-    @Override
-    public Rule<Plan> build() {
-        return logicalJoin().then(join -> plan(
-            new PhysicalHashJoin(join.operator.getJoinType(), join.operator.getCondition().get()),
-            join.getLogicalProperties(),
-            join.left(), join.right()
-        )).toRule(RuleType.LOGICAL_JOIN_TO_HASH_JOIN_RULE);
+public class OptimizeRulesJob extends BatchRulesJob {
+    OptimizeRulesJob(PlannerContext plannerContext) {
+        super(plannerContext);
+        rulesJob.addAll(ImmutableList.of(
+                optimize()
+        ));
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java
index 567ca328d6..d8ca408a13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java
@@ -27,6 +27,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalBinaryPlan;
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
 
 /**
  * Physical hash join plan operator.
@@ -35,7 +37,7 @@ public class PhysicalHashJoin extends PhysicalBinaryOperator {
 
     private final JoinType joinType;
 
-    private final Expression condition;
+    private final Optional<Expression> condition;
 
     /**
      * Constructor of PhysicalHashJoinNode.
@@ -43,17 +45,17 @@ public class PhysicalHashJoin extends PhysicalBinaryOperator {
      * @param joinType Which join type, left semi join, inner join...
      * @param predicate join condition.
      */
-    public PhysicalHashJoin(JoinType joinType, Expression predicate) {
+    public PhysicalHashJoin(JoinType joinType, Optional<Expression> predicate) {
         super(OperatorType.PHYSICAL_HASH_JOIN);
-        this.joinType = joinType;
-        this.condition = predicate;
+        this.joinType = Objects.requireNonNull(joinType, "joinType can not be null");
+        this.condition = Objects.requireNonNull(predicate, "predicate can not be null");
     }
 
     public JoinType getJoinType() {
         return joinType;
     }
 
-    public Expression getCondition() {
+    public Optional<Expression> getCondition() {
         return condition;
     }
 
@@ -64,6 +66,6 @@ public class PhysicalHashJoin extends PhysicalBinaryOperator {
 
     @Override
     public List<Expression> getExpressions() {
-        return ImmutableList.of(condition);
+        return condition.<List<Expression>>map(ImmutableList::of).orElseGet(ImmutableList::of);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index fda9440e36..55ff1121c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.nereids.rules;
 
-import org.apache.doris.nereids.rules.analysis.BindRelation;
 import org.apache.doris.nereids.rules.exploration.join.JoinCommutative;
 import org.apache.doris.nereids.rules.exploration.join.JoinLeftAssociative;
 import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
@@ -35,10 +34,6 @@ import java.util.List;
  * Containers for set of different type rules.
  */
 public class RuleSet {
-    public static final List<Rule<Plan>> ANALYSIS_RULES = planRuleFactories()
-            .add(new BindRelation())
-            .build();
-
     public static final List<Rule<Plan>> EXPLORATION_RULES = planRuleFactories()
             .add(new JoinCommutative(false))
             .add(new JoinLeftAssociative())
@@ -50,10 +45,6 @@ public class RuleSet {
             .add(new LogicalFilterToPhysicalFilter())
             .build();
 
-    public List<Rule<Plan>> getAnalysisRules() {
-        return ANALYSIS_RULES;
-    }
-
     public List<Rule<Plan>> getExplorationRules() {
         return EXPLORATION_RULES;
     }
@@ -62,11 +53,14 @@ public class RuleSet {
         return IMPLEMENTATION_RULES;
     }
 
-    private static RuleFactories<Plan> planRuleFactories() {
+    public static RuleFactories<Plan> planRuleFactories() {
         return new RuleFactories();
     }
 
-    private static class RuleFactories<TYPE extends TreeNode<TYPE>> {
+    /**
+     * generate rule factories.
+     */
+    public static class RuleFactories<TYPE extends TreeNode<TYPE>> {
         final Builder<Rule<TYPE>> rules = ImmutableList.builder();
 
         public RuleFactories<TYPE> add(RuleFactory<TYPE> ruleFactory) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java
index 2683e0fcb6..5204b37467 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJoinToHashJoin.java
@@ -29,7 +29,7 @@ public class LogicalJoinToHashJoin extends OneImplementationRuleFactory {
     @Override
     public Rule<Plan> build() {
         return logicalJoin().then(join -> plan(
-            new PhysicalHashJoin(join.operator.getJoinType(), join.operator.getCondition().get()),
+            new PhysicalHashJoin(join.operator.getJoinType(), join.operator.getCondition()),
             join.getLogicalProperties(),
             join.left(), join.right()
         )).toRule(RuleType.LOGICAL_JOIN_TO_HASH_JOIN_RULE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java
index 162399d01a..0a9307ae05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java
@@ -207,7 +207,7 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, Pl
         PlanFragment leftFragment = visit(hashJoin.child(0), context);
         PlanFragment rightFragment = visit(hashJoin.child(0), context);
         PhysicalHashJoin physicalHashJoin = hashJoin.getOperator();
-        Expression predicateExpr = physicalHashJoin.getCondition();
+        Expression predicateExpr = physicalHashJoin.getCondition().get();
         List<Expression> eqExprList = Utils.getEqConjuncts(hashJoin.child(0).getOutput(),
                 hashJoin.child(1).getOutput(), predicateExpr);
         JoinType joinType = physicalHashJoin.getJoinType();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org