You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/07/11 07:02:05 UTC
[doris] branch master updated: [feature](Nereids): cost and enforcer job in cascades. (#10657)
This is an automated email from the ASF dual-hosted git repository.
lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 51855633e4 [feature](Nereids): cost and enforcer job in cascades. (#10657)
51855633e4 is described below
commit 51855633e47b6eb6c7b2618bf16d4d073d073168
Author: jakevin <ja...@gmail.com>
AuthorDate: Mon Jul 11 15:01:59 2022 +0800
[feature](Nereids): cost and enforcer job in cascades. (#10657)
Issue Number: close #9640
Add enforcer job for cascades.
Inspired by to *NoisePage enforcer job*, and *ORCA paper*
During this period, we will derive physical property for plan tree, and prune the plan according to the cos.
---
.../apache/doris/nereids/cost/CostCalculator.java | 4 +-
.../org/apache/doris/nereids/jobs/JobContext.java | 6 +-
.../nereids/jobs/cascades/CostAndEnforcerJob.java | 189 +++++++++++++++++++++
.../java/org/apache/doris/nereids/memo/Group.java | 31 ++++
.../apache/doris/nereids/memo/GroupExpression.java | 34 ++++
.../java/org/apache/doris/nereids/memo/Memo.java | 7 +
.../doris/nereids/operators/OperatorType.java | 7 +-
.../plans/physical/PhysicalDistribution.java} | 29 ++--
.../properties/ChildrenOutputPropertyDeriver.java | 74 ++++++++
.../doris/nereids/properties/DistributionSpec.java | 29 +++-
.../properties/EnforceMissingPropertiesHelper.java | 125 ++++++++++++++
...Properties.java => GatherDistributionSpec.java} | 15 +-
.../apache/doris/nereids/properties/OrderKey.java | 9 +
.../apache/doris/nereids/properties/OrderSpec.java | 65 +++++++
.../properties/ParentRequiredPropertyDeriver.java | 58 +++++++
.../nereids/properties/PhysicalProperties.java | 35 +++-
16 files changed, 678 insertions(+), 39 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
index 0960dcb703..6e7bfd895e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
@@ -40,14 +40,14 @@ public class CostCalculator {
/**
* Constructor.
*/
- public double calculateCost(GroupExpression groupExpression) {
+ public static double calculateCost(GroupExpression groupExpression) {
PlanContext planContext = new PlanContext(groupExpression);
CostEstimator costCalculator = new CostEstimator();
CostEstimate costEstimate = groupExpression.getOperator().accept(costCalculator, planContext);
return costFormula(costEstimate);
}
- private double costFormula(CostEstimate costEstimate) {
+ private static double costFormula(CostEstimate costEstimate) {
double cpuCostWeight = 1;
double memoryCostWeight = 1;
double networkCostWeight = 1;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java
index 0aee081b9b..62143398e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/JobContext.java
@@ -26,7 +26,7 @@ import org.apache.doris.nereids.properties.PhysicalProperties;
public class JobContext {
private final PlannerContext plannerContext;
private final PhysicalProperties requiredProperties;
- private final double costUpperBound;
+ private double costUpperBound;
public JobContext(PlannerContext plannerContext, PhysicalProperties requiredProperties, double costUpperBound) {
this.plannerContext = plannerContext;
@@ -45,4 +45,8 @@ public class JobContext {
public double getCostUpperBound() {
return costUpperBound;
}
+
+ public void setCostUpperBound(double costUpperBound) {
+ this.costUpperBound = costUpperBound;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index ee72328afc..5516525233 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -17,18 +17,49 @@
package org.apache.doris.nereids.jobs.cascades;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.cost.CostCalculator;
import org.apache.doris.nereids.jobs.Job;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.JobType;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.ChildrenOutputPropertyDeriver;
+import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper;
+import org.apache.doris.nereids.properties.ParentRequiredPropertyDeriver;
+import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Optional;
+
/**
* Job to compute cost and add enforcer.
*/
public class CostAndEnforcerJob extends Job<Plan> {
+ // GroupExpression to optimize
private final GroupExpression groupExpression;
+ // Current total cost
+ private double curTotalCost;
+
+ // Properties from parent plan node.
+ // Like: Physical Hash Join
+ // [ [Properties ["", ANY], Properties ["", BROADCAST]],
+ // [Properties ["", SHUFFLE_JOIN], Properties ["", SHUFFLE_JOIN]] ]
+ private List<List<PhysicalProperties>> propertiesListList;
+
+ private List<GroupExpression> childrenBestGroupExprList;
+ private List<PhysicalProperties> childrenOutputProperties = Lists.newArrayList();
+
+ // Current stage of enumeration through child groups
+ private int curChildIndex = -1;
+ // Indicator of last child group that we waited for optimization
+ private int prevChildIndex = -1;
+ // Current stage of enumeration through outputInputProperties
+ private int curPropertyPairIndex = 0;
public CostAndEnforcerJob(GroupExpression groupExpression, JobContext context) {
super(JobType.OPTIMIZE_CHILDREN, context);
@@ -47,4 +78,162 @@ public class CostAndEnforcerJob extends Job<Plan> {
}
}
}
+
+ /**
+ * execute.
+ */
+ public void execute1() {
+ // Do init logic of root operator/groupExpr of `subplan`, only run once per task.
+ if (curChildIndex != -1) {
+ curTotalCost = 0;
+
+ // Get property from groupExpression operator (it's root of subplan).
+ ParentRequiredPropertyDeriver parentRequiredPropertyDeriver = new ParentRequiredPropertyDeriver(context);
+ propertiesListList = parentRequiredPropertyDeriver.getRequiredPropertyListList(groupExpression);
+
+ curChildIndex = 0;
+ }
+
+ for (; curPropertyPairIndex < propertiesListList.size(); curPropertyPairIndex++) {
+ // children input properties
+ List<PhysicalProperties> childrenInputProperties = propertiesListList.get(curPropertyPairIndex);
+
+ // Calculate cost of groupExpression and update total cost
+ if (curChildIndex == 0 && prevChildIndex == -1) {
+ curTotalCost += CostCalculator.calculateCost(groupExpression);
+ }
+
+ for (; curChildIndex < groupExpression.arity(); curChildIndex++) {
+ PhysicalProperties childInputProperties = childrenInputProperties.get(curChildIndex);
+ Group childGroup = groupExpression.child(curChildIndex);
+
+ // Whether the child group was optimized for this childInputProperties according to
+ // the result of returning.
+ Optional<Pair<Double, GroupExpression>> lowestCostPlanOpt = childGroup.getLowestCostPlan(
+ childInputProperties);
+
+ if (!lowestCostPlanOpt.isPresent()) {
+ // The child should be pruned due to cost prune.
+ if (prevChildIndex >= curChildIndex) {
+ break;
+ }
+
+ // This child isn't optimized, create new tasks to optimize it.
+ // Meaning that optimize recursively by derive tasks.
+ prevChildIndex = curChildIndex;
+ pushTask((CostAndEnforcerJob) clone());
+ double newCostUpperBound = context.getCostUpperBound() - curTotalCost;
+ JobContext jobContext = new JobContext(context.getPlannerContext(), childInputProperties,
+ newCostUpperBound);
+ pushTask(new OptimizeGroupJob(childGroup, jobContext));
+ return;
+ }
+
+ GroupExpression lowestCostExpr = lowestCostPlanOpt.get().second;
+
+ PhysicalProperties childOutputProperty = lowestCostExpr.getPropertyFromMap(childInputProperties);
+ // TODO: maybe need to record children lowestCostExpr
+ childrenInputProperties.set(curChildIndex, childOutputProperty);
+
+ // todo: check whether split agg broadcast row count limit.
+
+ curTotalCost += lowestCostExpr.getLowestCostTable().get(childInputProperties).first;
+ if (curTotalCost > context.getCostUpperBound()) {
+ break;
+ }
+ }
+
+ // When we successfully optimize all child group, it's last child.
+ if (curChildIndex == groupExpression.arity()) {
+ // Not need to do pruning here because it has been done when we get the
+ // best expr from the child group
+
+ // TODO: it could update the cost.
+ PhysicalProperties outputProperty = ChildrenOutputPropertyDeriver.getProperties(
+ context.getRequiredProperties(),
+ childrenOutputProperties, groupExpression);
+
+ if (curTotalCost > context.getCostUpperBound()) {
+ break;
+ }
+
+ /* update current group statistics and re-compute costs. */
+ if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() != null)) {
+ return;
+ }
+ PlanContext planContext = new PlanContext(groupExpression);
+ // TODO: calculate stats.
+ groupExpression.getParent().setStatistics(planContext.getStatistics());
+
+ enforce(outputProperty, childrenInputProperties);
+ }
+
+ // Reset child idx and total cost
+ prevChildIndex = -1;
+ curChildIndex = 0;
+ curTotalCost = 0;
+ }
+ }
+
+ private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> inputProperties) {
+
+ // groupExpression can satisfy its own output property
+ putProperty(groupExpression, outputProperty, outputProperty, inputProperties);
+ // groupExpression can satisfy the ANY type output property
+ putProperty(groupExpression, outputProperty, new PhysicalProperties(), inputProperties);
+
+ EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context,
+ groupExpression, curTotalCost);
+
+ PhysicalProperties requiredProperties = context.getRequiredProperties();
+ if (outputProperty.meet(requiredProperties)) {
+ Pair<PhysicalProperties, Double> pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty,
+ requiredProperties);
+ PhysicalProperties addEnforcedProperty = pair.first;
+ curTotalCost = pair.second;
+
+ // enforcedProperty is superset of requiredProperty
+ if (!addEnforcedProperty.equals(requiredProperties)) {
+ putProperty(groupExpression.getParent().getBestExpression(addEnforcedProperty),
+ requiredProperties, requiredProperties, Lists.newArrayList(outputProperty));
+ }
+ } else {
+ if (!outputProperty.equals(requiredProperties)) {
+ putProperty(groupExpression, outputProperty, requiredProperties, inputProperties);
+ }
+ }
+
+ if (curTotalCost < context.getCostUpperBound()) {
+ context.setCostUpperBound(curTotalCost);
+ }
+ }
+
+ private void putProperty(GroupExpression groupExpression,
+ PhysicalProperties outputProperty,
+ PhysicalProperties requiredProperty,
+ List<PhysicalProperties> inputProperties) {
+ if (groupExpression.updateLowestCostTable(requiredProperty, inputProperties, curTotalCost)) {
+ // Each group expression need to record the outputProperty satisfy what requiredProperty,
+ // because group expression can generate multi outputProperty. eg. Join may have shuffle local
+ // and shuffle join two types outputProperty.
+ groupExpression.putOutputPropertiesMap(outputProperty, requiredProperty);
+ }
+ this.groupExpression.getParent().setBestPlan(groupExpression,
+ curTotalCost, requiredProperty);
+ }
+
+
+ /**
+ * Shallow clone (ignore clone propertiesListList and groupExpression).
+ */
+ @Override
+ public Object clone() {
+ CostAndEnforcerJob task;
+ try {
+ task = (CostAndEnforcerJob) super.clone();
+ } catch (CloneNotSupportedException ignored) {
+ return null;
+ }
+ return task;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
index d2e3e2e86c..a9343cdd97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.statistics.StatsDeriveResult;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -50,6 +51,7 @@ public class Group {
private double costLowerBound = -1;
private boolean isExplored = false;
private boolean hasCost = false;
+ private StatsDeriveResult statistics;
/**
* Constructor for Group.
@@ -135,6 +137,35 @@ public class Group {
this.costLowerBound = costLowerBound;
}
+ /**
+ * Set or update lowestCostPlans: properties --> new Pair<>(cost, expression)
+ */
+ public void setBestPlan(GroupExpression expression, double cost, PhysicalProperties properties) {
+ if (lowestCostPlans.containsKey(properties)) {
+ if (lowestCostPlans.get(properties).first > cost) {
+ lowestCostPlans.put(properties, new Pair<>(cost, expression));
+ }
+ } else {
+ lowestCostPlans.put(properties, new Pair<>(cost, expression));
+ }
+ }
+
+ public GroupExpression getBestExpression(PhysicalProperties properties) {
+ if (lowestCostPlans.containsKey(properties)) {
+ return lowestCostPlans.get(properties).second;
+ }
+ return null;
+ }
+
+ public StatsDeriveResult getStatistics() {
+ return statistics;
+ }
+
+ public void setStatistics(StatsDeriveResult statistics) {
+ this.statistics = statistics;
+ }
+
+
public List<GroupExpression> getLogicalExpressions() {
return logicalExpressions;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index bb7da075cd..da4373de66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -44,6 +44,8 @@ public class GroupExpression {
// Mapping from output properties to the corresponding best cost, statistics, and child properties.
private final Map<PhysicalProperties, Pair<Double, List<PhysicalProperties>>> lowestCostTable;
+ // Each physical group expression maintains mapping incoming requests to the corresponding child requests.
+ private final Map<PhysicalProperties, PhysicalProperties> requestPropertiesMap;
public GroupExpression(Operator op) {
this(op, Lists.newArrayList());
@@ -61,6 +63,14 @@ public class GroupExpression {
this.ruleMasks = new BitSet(RuleType.SENTINEL.ordinal());
this.statDerived = false;
this.lowestCostTable = Maps.newHashMap();
+ this.requestPropertiesMap = Maps.newHashMap();
+ }
+
+ // TODO: rename
+ public PhysicalProperties getPropertyFromMap(PhysicalProperties requiredPropertySet) {
+ PhysicalProperties outputProperty = requestPropertiesMap.get(requiredPropertySet);
+ Preconditions.checkState(outputProperty != null);
+ return outputProperty;
}
public int arity() {
@@ -124,6 +134,30 @@ public class GroupExpression {
return lowestCostTable.get(require).second;
}
+ /**
+ * Add a (parentOutputProperties) -> (cost, childrenInputProperties) in lowestCostTable.
+ */
+ public boolean updateLowestCostTable(
+ PhysicalProperties parentOutputProperties,
+ List<PhysicalProperties> childrenInputProperties,
+ double cost) {
+ if (lowestCostTable.containsKey(parentOutputProperties)) {
+ if (lowestCostTable.get(parentOutputProperties).first > cost) {
+ lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+ return true;
+ }
+ } else {
+ lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+ return true;
+ }
+ return false;
+ }
+
+ public void putOutputPropertiesMap(PhysicalProperties outputPropertySet,
+ PhysicalProperties requiredPropertySet) {
+ this.requestPropertiesMap.put(requiredPropertySet, outputPropertySet);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
index 5924a75bdc..e07f4e46ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
@@ -193,4 +193,11 @@ public class Memo {
}
groups.remove(source);
}
+
+ /**
+ * Add enforcer expression into the target group.
+ */
+ public void addEnforcerPlan(GroupExpression groupExpression, Group group) {
+ groupExpression.setParent(group);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
index ad112935cf..a7f72ca3b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
@@ -23,9 +23,9 @@ package org.apache.doris.nereids.operators;
* 1. ANY: match any operator
* 2. MULTI: match multiple operators
* 3. FIXED: the leaf node of pattern tree, which can be matched by a single operator
- * but this operator cannot be used in rules
+ * but this operator cannot be used in rules
* 4. MULTI_FIXED: the leaf node of pattern tree, which can be matched by multiple operators,
- * but these operators cannot be used in rules
+ * but these operators cannot be used in rules
*/
public enum OperatorType {
UNKNOWN,
@@ -48,5 +48,6 @@ public enum OperatorType {
PHYSICAL_AGGREGATION,
PHYSICAL_SORT,
PHYSICAL_HASH_JOIN,
- PHYSICAL_EXCHANGE;
+ PHYSICAL_EXCHANGE,
+ PHYSICAL_DISTRIBUTION;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java
similarity index 58%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java
index 4122de6901..64c23ee024 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalDistribution.java
@@ -15,29 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.nereids.properties;
+package org.apache.doris.nereids.operators.plans.physical;
-import org.apache.doris.planner.DataPartition;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.DistributionSpec;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import java.util.List;
/**
- * Base class for data distribution.
+ * Enforcer operator.
*/
-public class DistributionSpec {
-
- private DataPartition dataPartition;
+public class PhysicalDistribution extends PhysicalUnaryOperator {
- public DistributionSpec() {
- }
+ protected DistributionSpec distributionSpec;
- public DistributionSpec(DataPartition dataPartition) {
- this.dataPartition = dataPartition;
- }
- public DataPartition getDataPartition() {
- return dataPartition;
+ public PhysicalDistribution(DistributionSpec spec) {
+ super(OperatorType.PHYSICAL_DISTRIBUTION);
}
- public void setDataPartition(DataPartition dataPartition) {
- this.dataPartition = dataPartition;
+ @Override
+ public List<Expression> getExpressions() {
+ return null;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
new file mode 100644
index 0000000000..6df2cfe15c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.OperatorVisitor;
+
+import java.util.List;
+
+/**
+ * Used for property drive.
+ */
+public class ChildrenOutputPropertyDeriver extends OperatorVisitor<PhysicalProperties, PlanContext> {
+ PhysicalProperties requirements;
+ List<PhysicalProperties> childrenOutputProperties;
+
+ public ChildrenOutputPropertyDeriver(PhysicalProperties requirements,
+ List<PhysicalProperties> childrenOutputProperties) {
+ this.childrenOutputProperties = childrenOutputProperties;
+ this.requirements = requirements;
+ }
+
+ public static PhysicalProperties getProperties(
+ PhysicalProperties requirements,
+ List<PhysicalProperties> childrenOutputProperties,
+ GroupExpression groupExpression) {
+
+ ChildrenOutputPropertyDeriver childrenOutputPropertyDeriver = new ChildrenOutputPropertyDeriver(requirements,
+ childrenOutputProperties);
+
+ return groupExpression.getOperator().accept(childrenOutputPropertyDeriver, new PlanContext(groupExpression));
+ }
+
+ public PhysicalProperties getRequirements() {
+ return requirements;
+ }
+
+ // public List<List<PhysicalProperties>> getProperties(GroupExpression groupExpression) {
+ // properties = Lists.newArrayList();
+ // groupExpression.getOperator().accept(this, new PlanContext(groupExpression));
+ // return properties;
+ // }
+
+ // @Override
+ // public Void visitOperator(Operator operator, PlanContext context) {
+ // List<PhysicalProperties> props = Lists.newArrayList();
+ // for (int childIndex = 0; childIndex < context.getGroupExpression().arity(); ++childIndex) {
+ // props.add(new PhysicalProperties());
+ // }
+ // properties.add(props);
+ // return null;
+ // }
+ @Override
+ public PhysicalProperties visitOperator(Operator node, PlanContext context) {
+ return new PhysicalProperties();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
index 4122de6901..cd0ea5a438 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
@@ -17,15 +17,21 @@
package org.apache.doris.nereids.properties;
+import org.apache.doris.nereids.memo.Group;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalDistribution;
import org.apache.doris.planner.DataPartition;
+import com.google.common.collect.Lists;
+
/**
- * Base class for data distribution.
+ * Spec of data distribution.
*/
public class DistributionSpec {
private DataPartition dataPartition;
+ // TODO: why exist?
public DistributionSpec() {
}
@@ -33,6 +39,16 @@ public class DistributionSpec {
this.dataPartition = dataPartition;
}
+ /**
+ * TODO: need read ORCA.
+ * Whether other `DistributionSpec` is satisfied the current `DistributionSpec`.
+ *
+ * @param other another DistributionSpec.
+ */
+ public boolean meet(DistributionSpec other) {
+ return false;
+ }
+
public DataPartition getDataPartition() {
return dataPartition;
}
@@ -40,4 +56,15 @@ public class DistributionSpec {
public void setDataPartition(DataPartition dataPartition) {
this.dataPartition = dataPartition;
}
+
+ public GroupExpression addEnforcer(Group child) {
+ return new GroupExpression(new PhysicalDistribution(new DistributionSpec(dataPartition)),
+ Lists.newArrayList(child));
+ }
+
+ // TODO
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
new file mode 100644
index 0000000000..aaa7a0d113
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.cost.CostCalculator;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+
+import com.google.common.collect.Lists;
+
+/**
+ * When parent node request some properties but children don't have.
+ * Enforce add missing properties for child.
+ */
+public class EnforceMissingPropertiesHelper {
+
+ private JobContext context;
+ private GroupExpression groupExpression;
+ private double curTotalCost;
+
+ public EnforceMissingPropertiesHelper(JobContext context, GroupExpression groupExpression,
+ double curTotalCost) {
+ this.context = context;
+ this.groupExpression = groupExpression;
+ this.curTotalCost = curTotalCost;
+ }
+
+ /**
+ * Enforce missing property.
+ */
+ public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties output, PhysicalProperties required) {
+ boolean isMeetOrder = output.getOrderSpec().meet(required.getOrderSpec());
+ boolean isMeetDistribution = output.getDistributionSpec().meet(required.getDistributionSpec());
+
+ if (!isMeetDistribution && !isMeetOrder) {
+ return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost);
+ } else if (isMeetDistribution && isMeetOrder) {
+ return new Pair<>(null, curTotalCost);
+ } else if (!isMeetDistribution) {
+ if (required.getOrderSpec().getOrderKeys().isEmpty()) {
+ return new Pair<>(enforceDistribution(output), curTotalCost);
+ } else {
+ // TODO
+ // It's wrong that SortSpec is empty.
+ // After redistribute data , original order requirement may be wrong. Need enforce "SortNode" here.
+ // PhysicalProperties newProperty =
+ // new PhysicalProperties(new DistributionSpec(), new OrderSpec(Lists.newArrayList()));
+ // groupExpression.getParent().
+ // return enforceSortAndDistribution(newProperty, required);
+ return new Pair<>(enforceDistribution(output), curTotalCost);
+ }
+ } else {
+ return new Pair<>(enforceSort(output), curTotalCost);
+ }
+ }
+
+ private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) {
+ // clone
+ PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
+ oldOutputProperty.getOrderSpec());
+ newOutputProperty.setOrderSpec(context.getRequiredProperties().getOrderSpec());
+ GroupExpression enforcer =
+ context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getParent());
+
+ updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
+
+ return newOutputProperty;
+ }
+
+ private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputProperty) {
+ PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
+ oldOutputProperty.getOrderSpec());
+ newOutputProperty.setDistributionSpec(context.getRequiredProperties().getDistributionSpec());
+ GroupExpression enforcer =
+ context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getParent());
+
+ updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
+
+ return newOutputProperty;
+ }
+
+ private void updateCostWithEnforcer(GroupExpression enforcer,
+ PhysicalProperties oldOutputProperty,
+ PhysicalProperties newOutputProperty) {
+ context.getPlannerContext().getMemo().addEnforcerPlan(enforcer, groupExpression.getParent());
+ curTotalCost += CostCalculator.calculateCost(enforcer);
+
+ if (enforcer.updateLowestCostTable(newOutputProperty, Lists.newArrayList(oldOutputProperty), curTotalCost)) {
+ enforcer.putOutputPropertiesMap(newOutputProperty, newOutputProperty);
+ }
+ groupExpression.getParent().setBestPlan(enforcer, curTotalCost, newOutputProperty);
+ }
+
+ private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty,
+ PhysicalProperties requiredProperty) {
+ PhysicalProperties enforcedProperty;
+ if (requiredProperty.getDistributionSpec()
+ .equals(new GatherDistributionSpec())) {
+ enforcedProperty = enforceSort(outputProperty);
+ enforcedProperty = enforceDistribution(enforcedProperty);
+ } else {
+ enforcedProperty = enforceDistribution(outputProperty);
+ enforcedProperty = enforceSort(enforcedProperty);
+ }
+
+ return enforcedProperty;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
similarity index 69%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
index 50899b7f31..e197ade368 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
@@ -18,18 +18,11 @@
package org.apache.doris.nereids.properties;
/**
- * Physical properties used in cascades.
+ * Re-shuffle.
*/
-public class PhysicalProperties {
- private DistributionSpec distributionDesc;
+public class GatherDistributionSpec extends DistributionSpec {
- public PhysicalProperties() {}
-
- public DistributionSpec getDistributionDesc() {
- return distributionDesc;
- }
-
- public void setDistributionDesc(DistributionSpec distributionDesc) {
- this.distributionDesc = distributionDesc;
+ public GatherDistributionSpec() {
+ super();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
index b9923db80a..7d6ee6eb8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
@@ -42,6 +42,15 @@ public class OrderKey {
this.nullFirst = nullFirst;
}
+ /**
+ * Whether other `OrderKey` is satisfied the current `OrderKey`.
+ *
+ * @param other another OrderKey.
+ */
+ public boolean matches(OrderKey other) {
+ return expr.equals(other.expr) && isAsc == other.isAsc && nullFirst == other.nullFirst;
+ }
+
public Expression getExpr() {
return expr;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
new file mode 100644
index 0000000000..1d1c3777ae
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.memo.Group;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHeapSort;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Spec of sort order.
+ */
+public class OrderSpec {
+ private final List<OrderKey> orderKeys;
+
+ public OrderSpec(List<OrderKey> orderKeys) {
+ this.orderKeys = orderKeys;
+ }
+
+ /**
+ * Whether other `OrderSpec` is satisfied the current `OrderSpec`.
+ *
+ * @param other another OrderSpec.
+ */
+ public boolean meet(OrderSpec other) {
+ if (this.orderKeys.size() < other.getOrderKeys().size()) {
+ return false;
+ }
+ for (int i = 0; i < other.getOrderKeys().size(); ++i) {
+ if (!this.orderKeys.get(i).matches(other.getOrderKeys().get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public GroupExpression addEnforcer(Group child) {
+ return new GroupExpression(
+ new PhysicalHeapSort(orderKeys, -1, 0),
+ Lists.newArrayList(child)
+ );
+ }
+
+ public List<OrderKey> getOrderKeys() {
+ return orderKeys;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
new file mode 100644
index 0000000000..681500ed69
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.OperatorVisitor;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Used for parent property drive.
+ */
+public class ParentRequiredPropertyDeriver extends OperatorVisitor<Void, PlanContext> {
+
+ PhysicalProperties requestPropertyFromParent;
+ List<List<PhysicalProperties>> requiredPropertyListList;
+
+ public ParentRequiredPropertyDeriver(JobContext context) {
+ this.requestPropertyFromParent = context.getRequiredProperties();
+ }
+
+ public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) {
+ requiredPropertyListList = Lists.newArrayList();
+ groupExpression.getOperator().accept(this, new PlanContext(groupExpression));
+ return requiredPropertyListList;
+ }
+
+ @Override
+ public Void visitOperator(Operator operator, PlanContext context) {
+ List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
+ for (int i = 0; i < context.getGroupExpression().arity(); i++) {
+ requiredPropertyList.add(new PhysicalProperties());
+ }
+ requiredPropertyListList.add(requiredPropertyList);
+ return null;
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 50899b7f31..75b02d7945 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -19,17 +19,40 @@ package org.apache.doris.nereids.properties;
/**
* Physical properties used in cascades.
+ * TODO(wj): Do we need to `PhysicalPropertySpec` Interface like NoisePage?
*/
public class PhysicalProperties {
- private DistributionSpec distributionDesc;
+ private OrderSpec orderSpec;
- public PhysicalProperties() {}
+ private DistributionSpec distributionSpec;
- public DistributionSpec getDistributionDesc() {
- return distributionDesc;
+ public PhysicalProperties() {
}
- public void setDistributionDesc(DistributionSpec distributionDesc) {
- this.distributionDesc = distributionDesc;
+ public PhysicalProperties(DistributionSpec distributionSpec, OrderSpec orderSpec) {
+ this.distributionSpec = distributionSpec;
+ this.orderSpec = orderSpec;
+ }
+
+ public boolean meet(PhysicalProperties other) {
+ // TODO: handle distributionSpec meet()
+ return orderSpec.meet(other.orderSpec) && distributionSpec.meet(other.distributionSpec);
+ }
+
+
+ public OrderSpec getOrderSpec() {
+ return orderSpec;
+ }
+
+ public void setOrderSpec(OrderSpec orderSpec) {
+ this.orderSpec = orderSpec;
+ }
+
+ public DistributionSpec getDistributionSpec() {
+ return distributionSpec;
+ }
+
+ public void setDistributionSpec(DistributionSpec distributionSpec) {
+ this.distributionSpec = distributionSpec;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org