You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by hu...@apache.org on 2022/07/31 09:22:49 UTC
[doris] branch master updated: [feature](nereids): polish property deriver enforcer job (#11222)
This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e20e6624ab [feature](nereids): polish property deriver enforcer job (#11222)
e20e6624ab is described below
commit e20e6624ab07d1577f96d5b3694e43543894be05
Author: jakevin <ja...@gmail.com>
AuthorDate: Sun Jul 31 17:22:41 2022 +0800
[feature](nereids): polish property deriver enforcer job (#11222)
Polish property deriver enforcer job
---
.../java/org/apache/doris/nereids/PlanContext.java | 5 +
.../apache/doris/nereids/cost/CostCalculator.java | 61 +++++++---
.../nereids/jobs/cascades/CostAndEnforcerJob.java | 81 ++++++++-----
.../apache/doris/nereids/memo/GroupExpression.java | 12 +-
.../properties/ChildOutputPropertyDeriver.java | 106 +++++++++++++++++
.../properties/ChildrenOutputPropertyDeriver.java | 74 ------------
.../doris/nereids/properties/DistributionSpec.java | 60 +++++-----
...tributionSpec.java => DistributionSpecAny.java} | 28 +++--
...butionDesc.java => DistributionSpecGather.java} | 12 +-
.../nereids/properties/DistributionSpecHash.java | 113 ++++++++++++++++++
...onSpec.java => DistributionSpecReplicated.java} | 11 +-
.../properties/EnforceMissingPropertiesHelper.java | 19 +--
.../apache/doris/nereids/properties/OrderSpec.java | 17 +++
.../properties/ParentRequiredPropertyDeriver.java | 58 ----------
.../nereids/properties/PhysicalProperties.java | 38 ++++--
.../nereids/properties/RequestPropertyDeriver.java | 127 +++++++++++++++++++++
.../rules/exploration/join/JoinLAsscom.java | 3 +-
.../rules/exploration/join/JoinProjectLAsscom.java | 5 +-
.../apache/doris/nereids/trees/plans/JoinType.java | 26 +++--
.../apache/doris/nereids/util/ExpressionUtils.java | 24 ----
.../org/apache/doris/nereids/util/JoinUtils.java | 123 ++++++++++++++++++++
.../java/org/apache/doris/nereids/util/Utils.java | 12 ++
.../doris/nereids/jobs/CostAndEnforcerJobTest.java | 114 ++++++++++++++++++
23 files changed, 841 insertions(+), 288 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
index 9f37171747..9d352d7027 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
@@ -19,6 +19,7 @@
package org.apache.doris.nereids;
import org.apache.doris.common.Id;
+import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
@@ -52,6 +53,10 @@ public class PlanContext {
public PlanContext(GroupExpression groupExpression) {
this.groupExpression = groupExpression;
+
+ for (Group group : groupExpression.children()) {
+ childrenStats.add(group.getStatistics());
+ }
}
public Plan getPlan() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
index 0950ed9643..055232bda1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
@@ -22,7 +22,9 @@ import org.apache.doris.nereids.PlanContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribution;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -62,17 +64,49 @@ public class CostCalculator {
}
@Override
- public CostEstimate visitPhysicalAggregate(PhysicalAggregate<Plan> aggregate, PlanContext context) {
+ public CostEstimate visitPhysicalOlapScan(PhysicalOlapScan physicalOlapScan, PlanContext context) {
StatsDeriveResult statistics = context.getStatisticsWithCheck();
return CostEstimate.ofCpu(statistics.computeSize());
}
@Override
- public CostEstimate visitPhysicalOlapScan(PhysicalOlapScan physicalOlapScan, PlanContext context) {
+ public CostEstimate visitPhysicalProject(PhysicalProject physicalProject, PlanContext context) {
StatsDeriveResult statistics = context.getStatisticsWithCheck();
return CostEstimate.ofCpu(statistics.computeSize());
}
+ @Override
+ public CostEstimate visitPhysicalHeapSort(PhysicalHeapSort physicalHeapSort, PlanContext context) {
+ // TODO: consider two-phase sort and enforcer.
+ StatsDeriveResult statistics = context.getStatisticsWithCheck();
+ StatsDeriveResult childStatistics = context.getChildStatistics(0);
+
+ return new CostEstimate(
+ childStatistics.computeSize(),
+ statistics.computeSize(),
+ childStatistics.computeSize());
+ }
+
+ @Override
+ public CostEstimate visitPhysicalDistribution(PhysicalDistribution physicalDistribution, PlanContext context) {
+ StatsDeriveResult statistics = context.getStatisticsWithCheck();
+ StatsDeriveResult childStatistics = context.getChildStatistics(0);
+
+ return new CostEstimate(
+ childStatistics.computeSize(),
+ statistics.computeSize(),
+ childStatistics.computeSize());
+ }
+
+ @Override
+ public CostEstimate visitPhysicalAggregate(PhysicalAggregate<Plan> aggregate, PlanContext context) {
+ // TODO: stage.....
+
+ StatsDeriveResult statistics = context.getStatisticsWithCheck();
+ StatsDeriveResult inputStatistics = context.getChildStatistics(0);
+ return new CostEstimate(inputStatistics.computeSize(), statistics.computeSize(), 0);
+ }
+
@Override
public CostEstimate visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> physicalHashJoin, PlanContext context) {
Preconditions.checkState(context.getGroupExpression().arity() == 2);
@@ -80,23 +114,24 @@ public class CostCalculator {
StatsDeriveResult leftStatistics = context.getChildStatistics(0);
StatsDeriveResult rightStatistics = context.getChildStatistics(1);
-
- // TODO: handle some case
-
List<Id> leftIds = context.getChildOutputIds(0);
List<Id> rightIds = context.getChildOutputIds(1);
+ // TODO: handle some case
// handle cross join, onClause is empty .....
-
+ if (physicalHashJoin.getJoinType().isCrossJoin()) {
+ return new CostEstimate(
+ leftStatistics.computeColumnSize(leftIds) + rightStatistics.computeColumnSize(rightIds),
+ rightStatistics.computeColumnSize(rightIds),
+ 0);
+ }
+
+ // TODO: network 0?
return new CostEstimate(
- leftStatistics.computeColumnSize(leftIds) + rightStatistics.computeColumnSize(rightIds),
- rightStatistics.computeColumnSize(rightIds), 0);
+ (leftStatistics.computeColumnSize(leftIds) + rightStatistics.computeColumnSize(rightIds)) / 2,
+ rightStatistics.computeColumnSize(rightIds),
+ 0);
}
- @Override
- public CostEstimate visitPhysicalProject(PhysicalProject physicalProject, PlanContext context) {
- StatsDeriveResult statistics = context.getStatisticsWithCheck();
- return CostEstimate.ofCpu(statistics.computeSize());
- }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index 520f219522..cb3f88cafe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -25,10 +25,10 @@ import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.JobType;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
-import org.apache.doris.nereids.properties.ChildrenOutputPropertyDeriver;
+import org.apache.doris.nereids.properties.ChildOutputPropertyDeriver;
import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper;
-import org.apache.doris.nereids.properties.ParentRequiredPropertyDeriver;
import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.properties.RequestPropertyDeriver;
import com.google.common.collect.Lists;
@@ -37,17 +37,18 @@ import java.util.Optional;
/**
* Job to compute cost and add enforcer.
+ * Inspired by NoisePage and ORCA-Paper.
*/
-public class CostAndEnforcerJob extends Job {
+public class CostAndEnforcerJob extends Job implements Cloneable {
// GroupExpression to optimize
private final GroupExpression groupExpression;
// Current total cost
private double curTotalCost;
- // Properties from parent plan node.
- // Like: Physical Hash Join
- // [ [Properties ["", ANY], Properties ["", BROADCAST]],
- // [Properties ["", SHUFFLE_JOIN], Properties ["", SHUFFLE_JOIN]] ]
+ // Children properties from parent plan node.
+ // Example: Physical Hash Join
+ // [ [Properties {"", ANY}, Properties {"", BROADCAST}],
+ // [Properties {"", SHUFFLE_JOIN}, Properties {"", SHUFFLE_JOIN}]]
private List<List<PhysicalProperties>> propertiesListList;
private List<GroupExpression> childrenBestGroupExprList;
@@ -78,17 +79,43 @@ public class CostAndEnforcerJob extends Job {
}
}
+ /*-
+ * Please read the ORCA paper
+ * - 4.1.4 Optimization.
+ * - Figure 7
+ *
+ * currentJobSubPlanRoot
+ * / ▲ ▲ \
+ * requested/ /childOutput childOutput\ \requested
+ * Properties/ /Properties Properties\ \Properties
+ * ▼ / \ ▼
+ * child child
+ *
+ *
+ * requestPropertyFromParent parentPlanNode
+ * ──► │ ──► ▲
+ * ▼ │
+ * requestPropertyToChildren ChildOutputProperty
+ *
+ * requestPropertyFromParent
+ * ┼
+ * ──► gap ──► add enforcer to fill the gap
+ * ┼
+ * ChildOutputProperty
+ */
+
/**
* execute.
*/
+ // @Override
public void execute1() {
// Do init logic of root plan/groupExpr of `subplan`, only run once per task.
- if (curChildIndex != -1) {
+ if (curChildIndex == -1) {
curTotalCost = 0;
// Get property from groupExpression plan (it's root of subplan).
- ParentRequiredPropertyDeriver parentRequiredPropertyDeriver = new ParentRequiredPropertyDeriver(context);
- propertiesListList = parentRequiredPropertyDeriver.getRequiredPropertyListList(groupExpression);
+ RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(context);
+ propertiesListList = requestPropertyDeriver.getRequiredPropertyListList(groupExpression);
curChildIndex = 0;
}
@@ -135,7 +162,6 @@ public class CostAndEnforcerJob extends Job {
childrenInputProperties.set(curChildIndex, childOutputProperty);
// todo: check whether split agg broadcast row count limit.
-
curTotalCost += lowestCostExpr.getLowestCostTable().get(childInputProperties).first;
if (curTotalCost > context.getCostUpperBound()) {
break;
@@ -148,7 +174,7 @@ public class CostAndEnforcerJob extends Job {
// best expr from the child group
// TODO: it could update the cost.
- PhysicalProperties outputProperty = ChildrenOutputPropertyDeriver.getProperties(
+ PhysicalProperties outputProperty = ChildOutputPropertyDeriver.getProperties(
context.getRequiredProperties(),
childrenOutputProperties, groupExpression);
@@ -161,10 +187,14 @@ public class CostAndEnforcerJob extends Job {
return;
}
PlanContext planContext = new PlanContext(groupExpression);
- // TODO: calculate stats.
+ // TODO: calculate stats. ??????
groupExpression.getOwnerGroup().setStatistics(planContext.getStatistics());
enforce(outputProperty, childrenInputProperties);
+
+ if (curTotalCost < context.getCostUpperBound()) {
+ context.setCostUpperBound(curTotalCost);
+ }
}
// Reset child idx and total cost
@@ -174,37 +204,33 @@ public class CostAndEnforcerJob extends Job {
}
}
- private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> inputProperties) {
+ private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> childrenInputProperties) {
// groupExpression can satisfy its own output property
- putProperty(groupExpression, outputProperty, outputProperty, inputProperties);
+ putProperty(groupExpression, outputProperty, outputProperty, childrenInputProperties);
// groupExpression can satisfy the ANY type output property
- putProperty(groupExpression, outputProperty, new PhysicalProperties(), inputProperties);
+ putProperty(groupExpression, outputProperty, new PhysicalProperties(), childrenInputProperties);
EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context,
groupExpression, curTotalCost);
- PhysicalProperties requiredProperties = context.getRequiredProperties();
- if (outputProperty.meet(requiredProperties)) {
+ PhysicalProperties requestedProperties = context.getRequiredProperties();
+ if (!outputProperty.meet(requestedProperties)) {
Pair<PhysicalProperties, Double> pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty,
- requiredProperties);
+ requestedProperties);
PhysicalProperties addEnforcedProperty = pair.first;
curTotalCost = pair.second;
// enforcedProperty is superset of requiredProperty
- if (!addEnforcedProperty.equals(requiredProperties)) {
+ if (!addEnforcedProperty.equals(requestedProperties)) {
putProperty(groupExpression.getOwnerGroup().getBestExpression(addEnforcedProperty),
- requiredProperties, requiredProperties, Lists.newArrayList(outputProperty));
+ requestedProperties, requestedProperties, Lists.newArrayList(outputProperty));
}
} else {
- if (!outputProperty.equals(requiredProperties)) {
- putProperty(groupExpression, outputProperty, requiredProperties, inputProperties);
+ if (!outputProperty.equals(requestedProperties)) {
+ putProperty(groupExpression, outputProperty, requestedProperties, childrenInputProperties);
}
}
-
- if (curTotalCost < context.getCostUpperBound()) {
- context.setCostUpperBound(curTotalCost);
- }
}
private void putProperty(GroupExpression groupExpression,
@@ -231,6 +257,7 @@ public class CostAndEnforcerJob extends Job {
try {
task = (CostAndEnforcerJob) super.clone();
} catch (CloneNotSupportedException ignored) {
+ ignored.printStackTrace();
return null;
}
return task;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index 27dbb0257d..d7a1508ace 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -136,19 +136,19 @@ public class GroupExpression {
}
/**
- * Add a (parentOutputProperties) -> (cost, childrenInputProperties) in lowestCostTable.
+ * Add a (outputProperties) -> (cost, childrenInputProperties) in lowestCostTable.
*/
public boolean updateLowestCostTable(
- PhysicalProperties parentOutputProperties,
+ PhysicalProperties outputProperties,
List<PhysicalProperties> childrenInputProperties,
double cost) {
- if (lowestCostTable.containsKey(parentOutputProperties)) {
- if (lowestCostTable.get(parentOutputProperties).first > cost) {
- lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+ if (lowestCostTable.containsKey(outputProperties)) {
+ if (lowestCostTable.get(outputProperties).first > cost) {
+ lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties));
return true;
}
} else {
- lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
+ lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties));
return true;
}
return false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
new file mode 100644
index 0000000000..5f6b96d82d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+/**
+ * Used for property drive.
+ */
+public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, PlanContext> {
+ /*
+ * parentPlanNode
+ * ▲
+ * │
+ * childOutputProperty
+ */
+ PhysicalProperties requestProperty;
+ List<PhysicalProperties> childrenOutputProperties;
+
+ public ChildOutputPropertyDeriver(PhysicalProperties requestProperty,
+ List<PhysicalProperties> childrenOutputProperties) {
+ this.childrenOutputProperties = childrenOutputProperties;
+ this.requestProperty = requestProperty;
+ }
+
+ public static PhysicalProperties getProperties(
+ PhysicalProperties requirements,
+ List<PhysicalProperties> childrenOutputProperties,
+ GroupExpression groupExpression) {
+
+ ChildOutputPropertyDeriver childOutputPropertyDeriver = new ChildOutputPropertyDeriver(requirements,
+ childrenOutputProperties);
+
+ return groupExpression.getPlan().accept(childOutputPropertyDeriver, new PlanContext(groupExpression));
+ }
+
+ public PhysicalProperties getRequestProperty() {
+ return requestProperty;
+ }
+
+ @Override
+ public PhysicalProperties visit(Plan plan, PlanContext context) {
+ return new PhysicalProperties();
+ }
+
+ @Override
+ public PhysicalProperties visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> hashJoin, PlanContext context) {
+ Preconditions.checkState(childrenOutputProperties.size() == 2);
+ PhysicalProperties leftOutputProperty = childrenOutputProperties.get(0);
+ PhysicalProperties rightOutputProperty = childrenOutputProperties.get(1);
+
+ // broadcast
+ if (rightOutputProperty.getDistributionSpec() instanceof DistributionSpecReplicated) {
+ // TODO
+ return leftOutputProperty;
+ }
+
+ // shuffle
+ // List<SlotReference> leftSlotRefs = hashJoin.left().getOutput().stream().map(slot -> (SlotReference) slot)
+ // .collect(Collectors.toList());
+ // List<SlotReference> rightSlotRefs = hashJoin.right().getOutput().stream().map(slot -> (SlotReference) slot)
+ // .collect(Collectors.toList());
+
+ // List<SlotReference> leftOnSlotRefs;
+ // List<SlotReference> rightOnSlotRefs;
+ // Preconditions.checkState(leftOnSlotRefs.size() == rightOnSlotRefs.size());
+ DistributionSpec leftDistribution = leftOutputProperty.getDistributionSpec();
+ DistributionSpec rightDistribution = rightOutputProperty.getDistributionSpec();
+ if (!(leftDistribution instanceof DistributionSpecHash)
+ || !(rightDistribution instanceof DistributionSpecHash)) {
+ Preconditions.checkState(false, "error");
+ return new PhysicalProperties();
+ }
+
+ return leftOutputProperty;
+ }
+
+ @Override
+ public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanContext context) {
+ return olapScan.getPhysicalProperties();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
deleted file mode 100644
index 8b991fea1f..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenOutputPropertyDeriver.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.properties;
-
-import org.apache.doris.nereids.PlanContext;
-import org.apache.doris.nereids.memo.GroupExpression;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-
-import java.util.List;
-
-/**
- * Used for property drive.
- */
-public class ChildrenOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, PlanContext> {
- PhysicalProperties requirements;
- List<PhysicalProperties> childrenOutputProperties;
-
- public ChildrenOutputPropertyDeriver(PhysicalProperties requirements,
- List<PhysicalProperties> childrenOutputProperties) {
- this.childrenOutputProperties = childrenOutputProperties;
- this.requirements = requirements;
- }
-
- public static PhysicalProperties getProperties(
- PhysicalProperties requirements,
- List<PhysicalProperties> childrenOutputProperties,
- GroupExpression groupExpression) {
-
- ChildrenOutputPropertyDeriver childrenOutputPropertyDeriver = new ChildrenOutputPropertyDeriver(requirements,
- childrenOutputProperties);
-
- return groupExpression.getPlan().accept(childrenOutputPropertyDeriver, new PlanContext(groupExpression));
- }
-
- public PhysicalProperties getRequirements() {
- return requirements;
- }
-
- // public List<List<PhysicalProperties>> getProperties(GroupExpression groupExpression) {
- // properties = Lists.newArrayList();
- // groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
- // return properties;
- // }
-
- // @Override
- // public Void visit(Plan plan, PlanContext context) {
- // List<PhysicalProperties> props = Lists.newArrayList();
- // for (int childIndex = 0; childIndex < context.getGroupExpression().arity(); ++childIndex) {
- // props.add(new PhysicalProperties());
- // }
- // properties.add(props);
- // return null;
- // }
- @Override
- public PhysicalProperties visit(Plan plan, PlanContext context) {
- return new PhysicalProperties();
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
index e849a1396f..d5568de293 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
@@ -21,52 +21,48 @@ import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribution;
-import org.apache.doris.planner.DataPartition;
import com.google.common.collect.Lists;
/**
* Spec of data distribution.
+ * GPORCA has more type in CDistributionSpec.
*/
-public class DistributionSpec {
-
- private DataPartition dataPartition;
-
- // TODO: why exist?
- public DistributionSpec() {
- }
-
- public DistributionSpec(DataPartition dataPartition) {
- this.dataPartition = dataPartition;
- }
-
+public abstract class DistributionSpec {
/**
- * TODO: need read ORCA.
- * Whether other `DistributionSpec` is satisfied the current `DistributionSpec`.
- *
- * @param other another DistributionSpec.
+ * Self satisfies other DistributionSpec.
+ * Example:
+ * `DistributionSpecGather` satisfies `DistributionSpecAny`
*/
- public boolean meet(DistributionSpec other) {
- return false;
- }
-
- public DataPartition getDataPartition() {
- return dataPartition;
- }
-
- public void setDataPartition(DataPartition dataPartition) {
- this.dataPartition = dataPartition;
- }
+ public abstract boolean satisfy(DistributionSpec other);
+ /**
+ * Add physical operator of enforcer.
+ */
public GroupExpression addEnforcer(Group child) {
+ // TODO:maybe we need to new a LogicalProperties or just do not set logical properties for this node.
+ // If we don't set LogicalProperties explicitly, node will compute a applicable LogicalProperties for itself.
PhysicalDistribution distribution = new PhysicalDistribution(
- new DistributionSpec(dataPartition), child.getLogicalProperties(), new GroupPlan(child));
+ this,
+ child.getLogicalProperties(),
+ new GroupPlan(child));
return new GroupExpression(distribution, Lists.newArrayList(child));
}
- // TODO
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ return true;
+ }
+
@Override
- public boolean equals(Object obj) {
- return super.equals(obj);
+ public String toString() {
+ return this.getClass().toString();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
similarity index 65%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
index ade20758ff..0457b8204b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
@@ -17,25 +17,23 @@
package org.apache.doris.nereids.properties;
-import org.apache.doris.analysis.HashDistributionDesc;
-
/**
- * Describe hash distribution.
+ * Data can be anywhere on the segments (required only).
*/
-public class HashDistributionSpec extends DistributionSpec {
+public class DistributionSpecAny extends DistributionSpec {
- /**
- * Enums for concrete shuffle type.
- */
- public enum ShuffleType {
- COLOCATE,
- BUCKET,
- AGG,
- NORMAL
- }
+ private static DistributionSpecAny instance = new DistributionSpecAny();
- private ShuffleType shuffleType;
+ private DistributionSpecAny() {
+ super();
+ }
- private HashDistributionDesc hashDistributionDesc;
+ public static DistributionSpecAny getInstance() {
+ return instance;
+ }
+ @Override
+ public boolean satisfy(DistributionSpec other) {
+ return other instanceof DistributionSpecAny;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
similarity index 70%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
index f54ed12a19..709a582e0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
@@ -18,8 +18,16 @@
package org.apache.doris.nereids.properties;
/**
- * Describe random distribution.
+ * Gather distribution which put all data into one node.
*/
-public class RandomDistributionDesc extends DistributionSpec {
+public class DistributionSpecGather extends DistributionSpec {
+ public DistributionSpecGather() {
+ super();
+ }
+
+ @Override
+ public boolean satisfy(DistributionSpec other) {
+ return other instanceof DistributionSpecGather || other instanceof DistributionSpecAny;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
new file mode 100644
index 0000000000..fb7cfe22df
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.nereids.annotation.Developing;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+
+import java.util.HashSet;
+import java.util.List;
+
+
+/**
+ * Describe hash distribution.
+ */
+@Developing
+public class DistributionSpecHash extends DistributionSpec {
+
+ private final List<SlotReference> shuffledColumns;
+
+ private final ShuffleType shuffleType;
+
+ public DistributionSpecHash(List<SlotReference> shuffledColumns, ShuffleType shuffleType) {
+ // Preconditions.checkState(!shuffledColumns.isEmpty());
+ this.shuffledColumns = shuffledColumns;
+ this.shuffleType = shuffleType;
+ }
+
+ public List<SlotReference> getShuffledColumns() {
+ return shuffledColumns;
+ }
+
+ public ShuffleType getShuffleType() {
+ return shuffleType;
+ }
+
+
+ @Override
+ public boolean satisfy(DistributionSpec other) {
+ if (other instanceof DistributionSpecAny) {
+ return true;
+ }
+
+ if (!(other instanceof DistributionSpecHash)) {
+ return false;
+ }
+
+ DistributionSpecHash spec = (DistributionSpecHash) other;
+
+ if (shuffledColumns.size() > spec.shuffledColumns.size()) {
+ return false;
+ }
+
+ // TODO: need consider following logic whether is right, and maybe need consider more.
+
+ // Current shuffleType is LOCAL/AGG, allow if current is contained by other
+ if (shuffleType == ShuffleType.LOCAL && spec.shuffleType == ShuffleType.AGG) {
+ return new HashSet<>(spec.shuffledColumns).containsAll(shuffledColumns);
+ }
+
+ if (shuffleType == ShuffleType.AGG && spec.shuffleType == ShuffleType.JOIN) {
+ return shuffledColumns.size() == spec.shuffledColumns.size()
+ && shuffledColumns.equals(spec.shuffledColumns);
+ } else if (shuffleType == ShuffleType.JOIN && spec.shuffleType == ShuffleType.AGG) {
+ return new HashSet<>(spec.shuffledColumns).containsAll(shuffledColumns);
+ }
+
+ if (!shuffleType.equals(spec.shuffleType)) {
+ return false;
+ }
+
+ return shuffledColumns.size() == spec.shuffledColumns.size()
+ && shuffledColumns.equals(spec.shuffledColumns);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!super.equals(o)) {
+ return false;
+ }
+ DistributionSpecHash that = (DistributionSpecHash) o;
+ return shuffledColumns.equals(that.shuffledColumns)
+ && shuffleType.equals(that.shuffleType);
+ // && propertyInfo.equals(that.propertyInfo)
+ }
+
+ /**
+ * Enums for concrete shuffle type.
+ */
+ public enum ShuffleType {
+ LOCAL,
+ BUCKET,
+ // Shuffle Aggregation
+ AGG,
+ // Shuffle Join
+ JOIN,
+ ENFORCE
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
similarity index 74%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
index e197ade368..850dc11995 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/GatherDistributionSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
@@ -18,11 +18,12 @@
package org.apache.doris.nereids.properties;
/**
- * Re-shuffle.
+ * Data is replicated across all segments.
+ * Like: broadcast join.
*/
-public class GatherDistributionSpec extends DistributionSpec {
-
- public GatherDistributionSpec() {
- super();
+public class DistributionSpecReplicated extends DistributionSpec {
+ @Override
+ public boolean satisfy(DistributionSpec other) {
+ return other instanceof DistributionSpecReplicated || other instanceof DistributionSpecAny;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
index 54d26f5ca6..73d23cbb27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
@@ -47,13 +47,17 @@ public class EnforceMissingPropertiesHelper {
*/
public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties output, PhysicalProperties required) {
boolean isMeetOrder = output.getOrderSpec().meet(required.getOrderSpec());
- boolean isMeetDistribution = output.getDistributionSpec().meet(required.getDistributionSpec());
+ boolean isMeetDistribution = output.getDistributionSpec().satisfy(required.getDistributionSpec());
if (!isMeetDistribution && !isMeetOrder) {
+ // Both Distribution and Order don't satisfy.
return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost);
} else if (isMeetDistribution && isMeetOrder) {
+ // Both satisfy.
+ // TODO: can't reach here.
return new Pair<>(null, curTotalCost);
} else if (!isMeetDistribution) {
+ // Distribution don't satisfy.
if (required.getOrderSpec().getOrderKeys().isEmpty()) {
return new Pair<>(enforceDistribution(output), curTotalCost);
} else {
@@ -67,15 +71,16 @@ public class EnforceMissingPropertiesHelper {
return new Pair<>(enforceDistribution(output), curTotalCost);
}
} else {
+ // Order don't satisfy.
return new Pair<>(enforceSort(output), curTotalCost);
}
}
private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) {
// clone
- PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
- oldOutputProperty.getOrderSpec());
- newOutputProperty.setOrderSpec(context.getRequiredProperties().getOrderSpec());
+ PhysicalProperties newOutputProperty = new PhysicalProperties(
+ oldOutputProperty.getDistributionSpec(),
+ context.getRequiredProperties().getOrderSpec());
GroupExpression enforcer =
context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getOwnerGroup());
@@ -85,9 +90,9 @@ public class EnforceMissingPropertiesHelper {
}
private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputProperty) {
- PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
+ PhysicalProperties newOutputProperty = new PhysicalProperties(
+ context.getRequiredProperties().getDistributionSpec(),
oldOutputProperty.getOrderSpec());
- newOutputProperty.setDistributionSpec(context.getRequiredProperties().getDistributionSpec());
GroupExpression enforcer =
context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getOwnerGroup());
@@ -112,7 +117,7 @@ public class EnforceMissingPropertiesHelper {
PhysicalProperties requiredProperty) {
PhysicalProperties enforcedProperty;
if (requiredProperty.getDistributionSpec()
- .equals(new GatherDistributionSpec())) {
+ .equals(new DistributionSpecGather())) {
enforcedProperty = enforceSort(outputProperty);
enforcedProperty = enforceDistribution(enforcedProperty);
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
index 196bbf0cc6..8b0f38aa0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
@@ -32,6 +32,10 @@ import java.util.List;
public class OrderSpec {
private final List<OrderKey> orderKeys;
+ public OrderSpec() {
+ this.orderKeys = Lists.newArrayList();
+ }
+
public OrderSpec(List<OrderKey> orderKeys) {
this.orderKeys = orderKeys;
}
@@ -45,6 +49,7 @@ public class OrderSpec {
if (this.orderKeys.size() < other.getOrderKeys().size()) {
return false;
}
+
for (int i = 0; i < other.getOrderKeys().size(); ++i) {
if (!this.orderKeys.get(i).matches(other.getOrderKeys().get(i))) {
return false;
@@ -63,4 +68,16 @@ public class OrderSpec {
public List<OrderKey> getOrderKeys() {
return orderKeys;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OrderSpec that = (OrderSpec) o;
+ return orderKeys.equals(that.orderKeys);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
deleted file mode 100644
index f78fd2c601..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ParentRequiredPropertyDeriver.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.properties;
-
-import org.apache.doris.nereids.PlanContext;
-import org.apache.doris.nereids.jobs.JobContext;
-import org.apache.doris.nereids.memo.GroupExpression;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-/**
- * Used for parent property drive.
- */
-public class ParentRequiredPropertyDeriver extends PlanVisitor<Void, PlanContext> {
-
- PhysicalProperties requestPropertyFromParent;
- List<List<PhysicalProperties>> requiredPropertyListList;
-
- public ParentRequiredPropertyDeriver(JobContext context) {
- this.requestPropertyFromParent = context.getRequiredProperties();
- }
-
- public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) {
- requiredPropertyListList = Lists.newArrayList();
- groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
- return requiredPropertyListList;
- }
-
- @Override
- public Void visit(Plan plan, PlanContext context) {
- List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
- for (int i = 0; i < context.getGroupExpression().arity(); i++) {
- requiredPropertyList.add(new PhysicalProperties());
- }
- requiredPropertyListList.add(requiredPropertyList);
- return null;
- }
-
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index 75b02d7945..feb448b33b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -22,11 +22,23 @@ package org.apache.doris.nereids.properties;
* TODO(wj): Do we need to `PhysicalPropertySpec` Interface like NoisePage?
*/
public class PhysicalProperties {
- private OrderSpec orderSpec;
+ private final OrderSpec orderSpec;
- private DistributionSpec distributionSpec;
+ private final DistributionSpec distributionSpec;
public PhysicalProperties() {
+ this.orderSpec = new OrderSpec();
+ this.distributionSpec = DistributionSpecAny.getInstance();
+ }
+
+ public PhysicalProperties(DistributionSpec distributionSpec) {
+ this.distributionSpec = distributionSpec;
+ this.orderSpec = new OrderSpec();
+ }
+
+ public PhysicalProperties(OrderSpec orderSpec) {
+ this.orderSpec = orderSpec;
+ this.distributionSpec = DistributionSpecAny.getInstance();
}
public PhysicalProperties(DistributionSpec distributionSpec, OrderSpec orderSpec) {
@@ -34,25 +46,29 @@ public class PhysicalProperties {
this.orderSpec = orderSpec;
}
+ // Current properties satisfies other properties.
public boolean meet(PhysicalProperties other) {
- // TODO: handle distributionSpec meet()
- return orderSpec.meet(other.orderSpec) && distributionSpec.meet(other.distributionSpec);
+ return orderSpec.meet(other.orderSpec) && distributionSpec.satisfy(other.distributionSpec);
}
-
public OrderSpec getOrderSpec() {
return orderSpec;
}
- public void setOrderSpec(OrderSpec orderSpec) {
- this.orderSpec = orderSpec;
- }
-
public DistributionSpec getDistributionSpec() {
return distributionSpec;
}
- public void setDistributionSpec(DistributionSpec distributionSpec) {
- this.distributionSpec = distributionSpec;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PhysicalProperties that = (PhysicalProperties) o;
+ return orderSpec.equals(that.orderSpec)
+ && distributionSpec.equals(that.distributionSpec);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
new file mode 100644
index 0000000000..848ed44f9d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Used for parent property drive.
+ */
+public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
+ /*
+ * requestPropertyFromParent
+ * │
+ * ▼
+ * curNode (current plan node in current CostAndEnforcerJob)
+ * │
+ * ▼
+ * requestPropertyToChildren
+ */
+ private PhysicalProperties requestPropertyFromParent;
+ private List<List<PhysicalProperties>> requestPropertyToChildren;
+
+ public RequestPropertyDeriver(JobContext context) {
+ this.requestPropertyFromParent = context.getRequiredProperties();
+ }
+
+ public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) {
+ requestPropertyToChildren = Lists.newArrayList();
+ groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
+ return requestPropertyToChildren;
+ }
+
+ @Override
+ public Void visit(Plan plan, PlanContext context) {
+ List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
+ for (int i = 0; i < context.getGroupExpression().arity(); i++) {
+ requiredPropertyList.add(new PhysicalProperties());
+ }
+ requestPropertyToChildren.add(requiredPropertyList);
+ return null;
+ }
+
+ @Override
+ public Void visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> hashJoin, PlanContext context) {
+ // for broadcast join
+ List<PhysicalProperties> propertiesForBroadcast = Lists.newArrayList(
+ new PhysicalProperties(),
+ new PhysicalProperties(new DistributionSpecReplicated())
+ );
+ // for shuffle join
+ Pair<List<SlotReference>, List<SlotReference>> onClauseUsedSlots = JoinUtils.getOnClauseUsedSlots(hashJoin);
+ List<PhysicalProperties> propertiesForShuffle = Lists.newArrayList(
+ new PhysicalProperties(new DistributionSpecHash(onClauseUsedSlots.first, ShuffleType.JOIN)),
+ new PhysicalProperties(new DistributionSpecHash(onClauseUsedSlots.second, ShuffleType.JOIN)));
+
+ if (!JoinUtils.onlyBroadcast(hashJoin)) {
+ requestPropertyToChildren.add(propertiesForShuffle);
+ }
+ if (!JoinUtils.onlyShuffle(hashJoin)) {
+ requestPropertyToChildren.add(propertiesForBroadcast);
+ }
+
+ return null;
+ }
+
+ protected static List<PhysicalProperties> computeShuffleJoinRequiredProperties(
+ PhysicalProperties requestedProperty, List<SlotReference> leftShuffleColumns,
+ List<SlotReference> rightShuffleColumns) {
+
+ // requestedProperty type isn't SHUFFLE_JOIN,
+ if (!(requestedProperty.getDistributionSpec() instanceof DistributionSpecHash
+ && ((DistributionSpecHash) requestedProperty.getDistributionSpec()).getShuffleType()
+ == ShuffleType.JOIN)) {
+ return Lists.newArrayList(
+ new PhysicalProperties(new DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+ new PhysicalProperties(new DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+ }
+
+ // adjust the required property shuffle columns based on the column order required by parent
+ DistributionSpecHash distributionSpec = (DistributionSpecHash) requestedProperty.getDistributionSpec();
+ List<SlotReference> requestedColumns = distributionSpec.getShuffledColumns();
+
+ boolean adjustBasedOnLeft = Utils.equalsIgnoreOrder(leftShuffleColumns, requestedColumns);
+ boolean adjustBasedOnRight = Utils.equalsIgnoreOrder(rightShuffleColumns, requestedColumns);
+ if (!adjustBasedOnLeft && !adjustBasedOnRight) {
+ return Lists.newArrayList(
+ new PhysicalProperties(new DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+ new PhysicalProperties(new DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+ }
+
+ return Lists.newArrayList(
+ new PhysicalProperties(new DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+ new PhysicalProperties(new DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+
+ }
+
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java
index 031ac28c91..26fce95acc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscom.java
@@ -31,6 +31,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -107,7 +108,7 @@ public class JoinLAsscom extends OneExplorationRuleFactory {
List<Expression> newTopJoinOnCondition = Lists.newArrayList();
for (Expression onCondition : allOnCondition) {
List<SlotReference> slots = onCondition.collect(SlotReference.class::isInstance);
- if (ExpressionUtils.containsAll(newBottomJoinSlots, slots)) {
+ if (new HashSet<>(newBottomJoinSlots).containsAll(slots)) {
newBottomJoinOnCondition.add(onCondition);
} else {
newTopJoinOnCondition.add(onCondition);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java
index 7f5500d4cc..74dfa353cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinProjectLAsscom.java
@@ -33,6 +33,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -115,7 +116,7 @@ public class JoinProjectLAsscom extends OneExplorationRuleFactory {
List<Expression> newTopJoinOnCondition = Lists.newArrayList();
for (Expression onCondition : allOnCondition) {
List<SlotReference> slots = onCondition.collect(SlotReference.class::isInstance);
- if (ExpressionUtils.containsAll(newBottomJoinSlots, slots)) {
+ if (new HashSet<>(newBottomJoinSlots).containsAll(slots)) {
newBottomJoinOnCondition.add(onCondition);
} else {
newTopJoinOnCondition.add(onCondition);
@@ -143,7 +144,7 @@ public class JoinProjectLAsscom extends OneExplorationRuleFactory {
List<NamedExpression> newLeftProjectExpr = Lists.newArrayList();
for (NamedExpression projectExpr : projectExprs) {
List<SlotReference> usedSlotRefs = projectExpr.collect(SlotReference.class::isInstance);
- if (ExpressionUtils.containsAll(bOutputSlots, usedSlotRefs)) {
+ if (new HashSet<>(bOutputSlots).containsAll(usedSlotRefs)) {
newRightProjectExprs.add(projectExpr);
} else {
newLeftProjectExpr.add(projectExpr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
index f53edae272..77b7c447dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
@@ -88,27 +88,31 @@ public enum JoinType {
return this == CROSS_JOIN;
}
- public final boolean isInnerOrOuterOrCrossJoin() {
- return this == INNER_JOIN || this == CROSS_JOIN || this == FULL_OUTER_JOIN;
+ public final boolean isInnerJoin() {
+ return this == INNER_JOIN;
}
- public final boolean isSwapJoinType() {
- return joinSwapMap.containsKey(this);
+ public final boolean isRightJoin() {
+ return this == RIGHT_OUTER_JOIN;
}
- public JoinType swap() {
- return joinSwapMap.get(this);
+ public final boolean isFullOuterJoin() {
+ return this == FULL_OUTER_JOIN;
}
- public boolean isSemiOrAntiJoin() {
+ public final boolean isSemiOrAntiJoin() {
return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
}
- public boolean isInnerJoin() {
- return this == INNER_JOIN;
+ public final boolean isOuterJoin() {
+ return this == LEFT_OUTER_JOIN || this == RIGHT_OUTER_JOIN || this == FULL_OUTER_JOIN;
}
- public boolean isOuterJoin() {
- return this == LEFT_OUTER_JOIN || this == RIGHT_OUTER_JOIN || this == FULL_OUTER_JOIN;
+ public final boolean isSwapJoinType() {
+ return joinSwapMap.containsKey(this);
+ }
+
+ public JoinType swap() {
+ return joinSwapMap.get(this);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
index 3641fd7a4c..89218c25f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
@@ -135,28 +135,4 @@ public class ExpressionUtils {
}
return false;
}
-
- /**
- * Whether `List of SlotReference` contains a `SlotReference`.
- */
- public static boolean contains(List<SlotReference> list, SlotReference item) {
- for (SlotReference slotRefInList : list) {
- if (item.equals(slotRefInList)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Whether `List of SlotReference` contains all another `List of SlotReference`.
- */
- public static boolean containsAll(List<SlotReference> large, List<SlotReference> small) {
- for (SlotReference slotRefInSmall : small) {
- if (!contains(large, slotRefInSmall)) {
- return false;
- }
- }
- return true;
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
new file mode 100644
index 0000000000..04ccafcaee
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.util;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utils for join
+ */
+public class JoinUtils {
+ public static boolean onlyBroadcast(PhysicalHashJoin join) {
+ // Cross-join only can be broadcast join.
+ return join.getJoinType().isCrossJoin();
+ }
+
+ public static boolean onlyShuffle(PhysicalHashJoin join) {
+ return join.getJoinType().isRightJoin() || join.getJoinType().isFullOuterJoin();
+ }
+
+ /**
+ * Get all equalTo from onClause of join
+ */
+ public static List<EqualTo> getEqualTo(PhysicalHashJoin<Plan, Plan> join) {
+ List<EqualTo> eqConjuncts = Lists.newArrayList();
+ if (!join.getCondition().isPresent()) {
+ return eqConjuncts;
+ }
+
+ List<SlotReference> leftSlots = join.left().getOutput().stream().map(slot -> (SlotReference) slot)
+ .collect(Collectors.toList());
+ List<SlotReference> rightSlots = join.right().getOutput().stream().map(slot -> (SlotReference) slot)
+ .collect(Collectors.toList());
+
+ Expression onCondition = join.getCondition().get();
+ List<Expression> conjunctList = ExpressionUtils.extractConjunctive(onCondition);
+ for (Expression predicate : conjunctList) {
+ if (isEqualTo(leftSlots, rightSlots, predicate)) {
+ eqConjuncts.add((EqualTo) predicate);
+ }
+ }
+ return eqConjuncts;
+ }
+
+ private static boolean isEqualTo(List<SlotReference> leftSlots, List<SlotReference> rightSlots,
+ Expression predicate) {
+ if (!(predicate instanceof EqualTo)) {
+ return false;
+ }
+
+ EqualTo equalTo = (EqualTo) predicate;
+ List<SlotReference> leftUsed = equalTo.left().collect(SlotReference.class::isInstance);
+ List<SlotReference> rightUsed = equalTo.right().collect(SlotReference.class::isInstance);
+ if (leftUsed.isEmpty() || rightUsed.isEmpty()) {
+ return false;
+ }
+
+ return Utils.equalsIgnoreOrder(leftUsed, leftSlots) || Utils.equalsIgnoreOrder(rightUsed, rightSlots);
+ }
+
+ /**
+ * Get all used slots from onClause of join.
+ * Return pair of left used slots and right used slots.
+ */
+ public static Pair<List<SlotReference>, List<SlotReference>> getOnClauseUsedSlots(
+ PhysicalHashJoin<Plan, Plan> join) {
+ Pair<List<SlotReference>, List<SlotReference>> childSlots =
+ new Pair<>(Lists.newArrayList(), Lists.newArrayList());
+
+ List<SlotReference> leftSlots = join.left().getOutput().stream().map(slot -> (SlotReference) slot)
+ .collect(Collectors.toList());
+ List<SlotReference> rightSlots = join.right().getOutput().stream().map(slot -> (SlotReference) slot)
+ .collect(Collectors.toList());
+ List<EqualTo> equalToList = getEqualTo(join);
+
+
+ for (EqualTo equalTo : equalToList) {
+ List<SlotReference> leftOnSlots = equalTo.left().collect(SlotReference.class::isInstance);
+ List<SlotReference> rightOnSlots = equalTo.right().collect(SlotReference.class::isInstance);
+
+ if (new HashSet<>(leftSlots).containsAll(leftOnSlots)
+ && new HashSet<>(rightSlots).containsAll(rightOnSlots)) {
+ // TODO: need rethink about `.get(0)`
+ childSlots.first.add(leftOnSlots.get(0));
+ childSlots.second.add(rightOnSlots.get(0));
+ } else if (new HashSet<>(leftSlots).containsAll(rightOnSlots)
+ && new HashSet<>(rightSlots).containsAll(leftOnSlots)) {
+ childSlots.first.add(rightOnSlots.get(0));
+ childSlots.second.add(leftOnSlots.get(0));
+ } else {
+ Preconditions.checkState(false, "error");
+ }
+ }
+
+ return childSlots;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
index 2500283e44..a5bc161570 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.util;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.StringUtils;
+import java.util.HashSet;
import java.util.List;
/**
@@ -95,4 +96,15 @@ public class Utils {
public static String qualifiedName(List<String> qualifier, String name) {
return StringUtils.join(qualifiedNameParts(qualifier, name), ".");
}
+
+
+ /**
+ * equals for List but ignore order.
+ */
+ public static <E> boolean equalsIgnoreOrder(List<E> one, List<E> other) {
+ if (one.size() != other.size()) {
+ return false;
+ }
+ return new HashSet<>(one).containsAll(other) && new HashSet<>(other).containsAll(one);
+ }
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
new file mode 100644
index 0000000000..d96f42b054
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.nereids.PlannerContext;
+import org.apache.doris.nereids.cost.CostCalculator;
+import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.memo.Memo;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+public class CostAndEnforcerJobTest {
+ /*
+ * topJoin
+ * / \
+ * C bottomJoin
+ * / \
+ * A B
+ */
+ @Test
+ public void testExecute(@Mocked LogicalProperties logicalProperties) {
+ new MockUp<CostCalculator>() {
+ @Mock
+ public double calculateCost(GroupExpression groupExpression) {
+ return 0;
+ }
+ };
+
+ OlapTable aOlapTable = new OlapTable(0L, "a",
+ ImmutableList.of(new Column("id", Type.INT, true, AggregateType.NONE, "0", ""),
+ new Column("name", Type.STRING, true, AggregateType.NONE, "", "")),
+ KeysType.PRIMARY_KEYS,
+ null, null);
+ OlapTable bOlapTable = new OlapTable(0L, "b",
+ ImmutableList.of(new Column("id", Type.INT, true, AggregateType.NONE, "0", ""),
+ new Column("name", Type.STRING, true, AggregateType.NONE, "", "")),
+ KeysType.PRIMARY_KEYS,
+ null, null);
+ PhysicalOlapScan aScan = new PhysicalOlapScan(aOlapTable, Lists.newArrayList("a"), Optional.empty(),
+ logicalProperties);
+ PhysicalOlapScan bScan = new PhysicalOlapScan(bOlapTable, Lists.newArrayList("b"), Optional.empty(),
+ logicalProperties);
+
+
+ OlapTable cOlapTable = new OlapTable(0L, "c",
+ ImmutableList.of(new Column("id", Type.INT, true, AggregateType.NONE, "0", ""),
+ new Column("name", Type.STRING, true, AggregateType.NONE, "", "")),
+ KeysType.PRIMARY_KEYS,
+ null, null);
+ PhysicalPlan cScan = new PhysicalOlapScan(cOlapTable, Lists.newArrayList("c"), Optional.empty(),
+ logicalProperties);
+
+ Expression bottomJoinOnCondition = new EqualTo(
+ new SlotReference("id", new IntegerType(), true, ImmutableList.of("a")),
+ new SlotReference("id", new IntegerType(), true, ImmutableList.of("b")));
+ Expression topJoinOnCondition = new EqualTo(
+ new SlotReference("id", new IntegerType(), true, ImmutableList.of("a")),
+ new SlotReference("id", new IntegerType(), true, ImmutableList.of("c")));
+
+ PhysicalHashJoin bottomJoin = new PhysicalHashJoin<>(JoinType.INNER_JOIN,
+ Optional.of(bottomJoinOnCondition),
+ logicalProperties, aScan, bScan);
+ PhysicalHashJoin topJoin = new PhysicalHashJoin<>(JoinType.INNER_JOIN,
+ Optional.of(topJoinOnCondition),
+ logicalProperties, cScan, bottomJoin);
+
+
+ PlannerContext plannerContext = new Memo(topJoin).newPlannerContext(new ConnectContext())
+ .setDefaultJobContext();
+
+ OptimizeGroupJob optimizeGroupJob = new OptimizeGroupJob(plannerContext.getMemo().getRoot(),
+ plannerContext.getCurrentJobContext());
+ plannerContext.pushJob(optimizeGroupJob);
+ plannerContext.getJobScheduler().executeJobPool(plannerContext);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org