You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/06/14 11:40:03 UTC
[incubator-doris] branch master updated: [feature](nereids) Plan Translator (#9993)
This is an automated email from the ASF dual-hosted git repository.
lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 25b9d6eba2 [feature](nereids) Plan Translator (#9993)
25b9d6eba2 is described below
commit 25b9d6eba24480993941e7b32062279e325dc4fb
Author: Kikyou1997 <33...@users.noreply.github.com>
AuthorDate: Tue Jun 14 19:39:55 2022 +0800
[feature](nereids) Plan Translator (#9993)
Issue Number: close #9621
Add following physical operator: PhysicalAgg PhysicalSort PhysicalHashJoin
Add basic logic of plan translator
1. add new agg phase enum for nereids
2. remove the Analyzer from PlanContext.java
3. implement PlanTranslator::visitPhysicalFilter
---
.../org/apache/doris/analysis/AggregateInfo.java | 15 +-
.../org/apache/doris/analysis/DescriptorTable.java | 10 +
.../java/org/apache/doris/analysis/SortInfo.java | 13 +
.../apache/doris/nereids/PlanOperatorVisitor.java | 71 +++++
.../doris/nereids/operators/AbstractOperator.java | 27 ++
.../apache/doris/nereids/operators/Operator.java | 5 +
.../doris/nereids/operators/OperatorType.java | 4 +
.../doris/nereids/operators/plans/AggPhase.java | 53 ++++
.../doris/nereids/operators/plans/JoinType.java | 4 +-
.../plans/physical/PhysicalAggregation.java | 87 ++++++
.../operators/plans/physical/PhysicalFilter.java | 8 +
.../operators/plans/physical/PhysicalHashJoin.java | 62 ++++
.../operators/plans/physical/PhysicalOlapScan.java | 27 +-
.../operators/plans/physical/PhysicalOperator.java | 2 +
.../operators/plans/physical/PhysicalProject.java | 8 +
.../operators/plans/physical/PhysicalScan.java | 14 +-
.../operators/plans/physical/PhysicalSort.java | 77 +++++
...ysicalProperties.java => DistributionSpec.java} | 22 +-
...alProperties.java => HashDistributionSpec.java} | 20 +-
.../{PhysicalProperties.java => OrderKey.java} | 34 ++-
.../nereids/properties/PhysicalProperties.java | 9 +
...Properties.java => RandomDistributionDesc.java} | 4 +-
.../{Slot.java => ExpressionConverter.java} | 18 +-
.../trees/expressions/FunctionCallExpression.java | 62 ++++
.../doris/nereids/trees/expressions/Slot.java | 11 +
.../nereids/trees/expressions/SlotReference.java | 10 +-
.../trees/plans/PhysicalPlanTranslator.java | 322 +++++++++++++++++++++
.../org/apache/doris/nereids/trees/plans/Plan.java | 1 +
.../doris/nereids/trees/plans/PlanContext.java | 73 +++++
.../nereids/trees/plans/physical/PhysicalPlan.java | 1 +
.../java/org/apache/doris/nereids/util/Utils.java | 15 +
.../org/apache/doris/planner/AggregationNode.java | 10 +
.../org/apache/doris/planner/HashJoinNode.java | 108 +++++--
33 files changed, 1139 insertions(+), 68 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index d2ea7443e9..298837d11e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -48,6 +48,7 @@ import java.util.List;
* SELECT COUNT(*) FROM (SELECT DISTINCT a, b, ..., x, y, ...) GROUP BY x, y, ...
*
* The tree structure looks as follows:
+ * <pre>
* - for non-distinct aggregation:
* - aggInfo: contains the original aggregation functions and grouping exprs
* - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping
@@ -61,7 +62,7 @@ import java.util.List;
* computation (grouping exprs are identical)
* - aggInfo.2ndPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate
* functions for the phase 2 computation (grouping exprs are identical)
- *
+ * </pre>
* In general, merging aggregate computations are idempotent; in other words,
* aggInfo.mergeAggInfo == aggInfo.mergeAggInfo.mergeAggInfo.
*
@@ -224,6 +225,17 @@ public final class AggregateInfo extends AggregateInfoBase {
return result;
}
+ /**
+ * Used by new optimizer.
+ */
+ public static AggregateInfo create(
+ ArrayList<Expr> groupingExprs, ArrayList<FunctionCallExpr> aggExprs,
+ TupleDescriptor tupleDesc, TupleDescriptor intermediateTupleDesc, AggPhase phase) {
+ AggregateInfo result = new AggregateInfo(groupingExprs, aggExprs, phase);
+ result.outputTupleDesc = tupleDesc;
+ result.intermediateTupleDesc = intermediateTupleDesc;
+ return result;
+ }
/**
* estimate if functions contains multi distinct
@@ -856,4 +868,5 @@ public final class AggregateInfo extends AggregateInfoBase {
public List<Expr> getInputPartitionExprs() {
return partitionExprs != null ? partitionExprs : groupingExprs;
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 526c6e4806..f733e3a9a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -73,6 +73,16 @@ public class DescriptorTable {
return result;
}
+ /**
+ * Used by new optimizer.
+ */
+ public SlotDescriptor addSlotDescriptor(TupleDescriptor d, int id) {
+ SlotDescriptor result = new SlotDescriptor(new SlotId(id), d);
+ d.addSlot(result);
+ slotDescs.put(result.getId(), result);
+ return result;
+ }
+
/**
* Create copy of src with new id. The returned descriptor has its mem layout
* computed.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
index d57856b08b..fe287e3900 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
@@ -73,6 +73,19 @@ public class SortInfo {
materializedOrderingExprs = Lists.newArrayList();
}
+ /**
+ * Used by new optimizer.
+ */
+ public SortInfo(List<Expr> orderingExprs,
+ List<Boolean> isAscOrder,
+ List<Boolean> nullsFirstParams,
+ TupleDescriptor sortTupleDesc) {
+ this.orderingExprs = orderingExprs;
+ this.isAscOrder = isAscOrder;
+ this.nullsFirstParams = nullsFirstParams;
+ this.sortTupleDesc = sortTupleDesc;
+ }
+
/**
* C'tor for cloning.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java
new file mode 100644
index 0000000000..063f92aae7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanOperatorVisitor.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids;
+
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+/**
+ * Base class for the processing of logical and physical plan.
+ *
+ * @param <R> Return type of each visit method.
+ * @param <C> Context type.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class PlanOperatorVisitor<R, C> {
+
+ public abstract R visit(Plan<? extends Plan, ? extends Operator> plan, C context);
+
+ public R visitPhysicalAggregationPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation> aggPlan,
+ C context) {
+ return null;
+ }
+
+ public R visitPhysicalOlapScanPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan> olapScanPlan,
+ C context) {
+ return null;
+ }
+
+ public R visitPhysicalSortPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalSort> sortPlan,
+ C context) {
+ return null;
+ }
+
+ public R visitPhysicalHashJoinPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin> hashJoinPlan,
+ C context) {
+ return null;
+ }
+
+ public R visitPhysicalProject(PhysicalPlan<? extends PhysicalPlan, PhysicalProject> projectPlan,
+ C context) {
+ return null;
+ }
+
+ public R visitPhysicalFilter(PhysicalPlan<? extends PhysicalPlan, PhysicalFilter> filterPlan,
+ C context) {
+ return null;
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java
index 54064a6f2f..137317cbeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/AbstractOperator.java
@@ -17,6 +17,9 @@
package org.apache.doris.nereids.operators;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.trees.plans.Plan;
+
import java.util.Objects;
/**
@@ -24,13 +27,37 @@ import java.util.Objects;
*/
public abstract class AbstractOperator<TYPE extends AbstractOperator<TYPE>> implements Operator<TYPE> {
protected final OperatorType type;
+ protected final long limited;
public AbstractOperator(OperatorType type) {
this.type = Objects.requireNonNull(type, "type can not be null");
+ this.limited = -1;
+ }
+
+ public AbstractOperator(OperatorType type, long limited) {
+ this.type = type;
+ this.limited = limited;
}
@Override
public OperatorType getType() {
return type;
}
+
+ /**
+ * Child operator should overwrite this method.
+ * for example:
+ * <code>
+ * visitor.visitPhysicalOlapScanPlan(
+ * (PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan>) plan, context);
+ * </code>
+ */
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+ return null;
+ }
+
+ public long getLimited() {
+ return limited;
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java
index bb2450ce35..efecb3a201 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/Operator.java
@@ -17,8 +17,10 @@
package org.apache.doris.nereids.operators;
+import org.apache.doris.nereids.PlanOperatorVisitor;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.trees.TreeNode;
+import org.apache.doris.nereids.trees.plans.Plan;
/**
* interface for all concrete operator.
@@ -27,4 +29,7 @@ public interface Operator<TYPE extends Operator<TYPE>> {
OperatorType getType();
<NODE_TYPE extends TreeNode> NODE_TYPE toTreeNode(GroupExpression groupExpression);
+
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context);
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
index b506839854..f1a183521f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/OperatorType.java
@@ -40,6 +40,10 @@ public enum OperatorType {
PHYSICAL_PROJECT,
PHYSICAL_FILTER,
PHYSICAL_BROADCAST_HASH_JOIN,
+ PHYSICAL_AGGREGATION,
+ PHYSICAL_SORT,
+ PHYSICAL_HASH_JOIN,
+ PHYSICAL_EXCHANGE,
// pattern
ANY,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java
new file mode 100644
index 0000000000..c365b06d89
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/AggPhase.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+
+/**
+ * Represents different phase of agg and map it to the
+ * enum of agg phase definition of stale optimizer.
+ */
+public enum AggPhase {
+ FIRST("FIRST", AggregateInfo.AggPhase.FIRST),
+ FIRST_MERGE("FIRST_MERGE", AggregateInfo.AggPhase.FIRST_MERGE),
+ SECOND("SECOND", AggregateInfo.AggPhase.SECOND),
+ SECOND_MERGE("SECOND_MERGE", AggregateInfo.AggPhase.SECOND_MERGE);
+
+ private final String name;
+
+ private final AggregateInfo.AggPhase execAggPhase;
+
+ AggPhase(String name, AggregateInfo.AggPhase execAggPhase) {
+ this.name = name;
+ this.execAggPhase = execAggPhase;
+ }
+
+ public boolean isMerge() {
+ return this == FIRST_MERGE || this == SECOND_MERGE;
+ }
+
+ public AggregateInfo.AggPhase toExec() {
+ return this.execAggPhase;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java
index f22634d649..9e2b8d41e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/JoinType.java
@@ -59,7 +59,7 @@ public enum JoinType {
* @return legacy join type in Doris
* @throws AnalysisException throw this exception when input join type cannot convert to legacy join type in Doris
*/
- public static JoinOperator toJoinOperator(JoinType joinType) throws AnalysisException {
+ public static JoinOperator toJoinOperator(JoinType joinType) {
switch (joinType) {
case INNER_JOIN:
return JoinOperator.INNER_JOIN;
@@ -80,7 +80,7 @@ public enum JoinType {
case CROSS_JOIN:
return JoinOperator.CROSS_JOIN;
default:
- throw new AnalysisException("Not support join operator: " + joinType.name());
+ throw new RuntimeException("Unexpected join operator: " + joinType.name());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java
new file mode 100644
index 0000000000..feb72bd837
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalAggregation.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.AggPhase;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+/**
+ * Physical aggregation plan operator.
+ */
+public class PhysicalAggregation extends PhysicalUnaryOperator<PhysicalAggregation, PhysicalPlan> {
+
+ private final List<Expression> groupByExprList;
+
+ private final List<Expression> aggExprList;
+
+ private final List<Expression> partitionExprList;
+
+ private final AggPhase aggPhase;
+
+ private final boolean usingStream;
+
+ /**
+ * Constructor of PhysicalAggNode.
+ *
+ * @param groupByExprList group by expr list.
+ * @param aggExprList agg expr list.
+ * @param partitionExprList partition expr list, used for analytic agg.
+ * @param usingStream whether it's stream agg.
+ */
+ public PhysicalAggregation(List<Expression> groupByExprList, List<Expression> aggExprList,
+ List<Expression> partitionExprList, AggPhase aggPhase, boolean usingStream) {
+ super(OperatorType.PHYSICAL_AGGREGATION);
+ this.groupByExprList = groupByExprList;
+ this.aggExprList = aggExprList;
+ this.partitionExprList = partitionExprList;
+ this.aggPhase = aggPhase;
+ this.usingStream = usingStream;
+ }
+
+ public List<Expression> getGroupByExprList() {
+ return groupByExprList;
+ }
+
+ public List<Expression> getAggExprList() {
+ return aggExprList;
+ }
+
+ public AggPhase getAggPhase() {
+ return aggPhase;
+ }
+
+ public boolean isUsingStream() {
+ return usingStream;
+ }
+
+ public List<Expression> getPartitionExprList() {
+ return partitionExprList;
+ }
+
+ @Override
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+ return visitor.visitPhysicalAggregationPlan(
+ (PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation>) plan, context);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java
index e3a8229b90..f62b69e3ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalFilter.java
@@ -17,9 +17,11 @@
package org.apache.doris.nereids.operators.plans.physical;
+import org.apache.doris.nereids.PlanOperatorVisitor;
import org.apache.doris.nereids.operators.OperatorType;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import java.util.Objects;
@@ -50,4 +52,10 @@ public class PhysicalFilter<INPUT_TYPE extends Plan>
}
return "Filter (" + cond + ")";
}
+
+ @Override
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+ return visitor.visitPhysicalFilter((PhysicalPlan<? extends PhysicalPlan, PhysicalFilter>) plan,
+ context);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java
new file mode 100644
index 0000000000..41785dbab8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalHashJoin.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+/**
+ * Physical hash join plan operator.
+ */
+public class PhysicalHashJoin extends PhysicalBinaryOperator<PhysicalHashJoin, PhysicalPlan, PhysicalPlan> {
+
+ private final JoinType joinType;
+
+ private final Expression predicate;
+
+ /**
+ * Constructor of PhysicalHashJoinNode.
+ *
+ * @param joinType Which join type, left semi join, inner join...
+ * @param predicate join condition.
+ */
+ public PhysicalHashJoin(JoinType joinType, Expression predicate) {
+ super(OperatorType.PHYSICAL_HASH_JOIN);
+ this.joinType = joinType;
+ this.predicate = predicate;
+ }
+
+ public JoinType getJoinType() {
+ return joinType;
+ }
+
+ public Expression getPredicate() {
+ return predicate;
+ }
+
+ @Override
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+ return visitor.visitPhysicalHashJoinPlan(
+ (PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin>) plan, context);
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java
index 62d9b3babc..c56ae50c81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOlapScan.java
@@ -19,7 +19,10 @@ package org.apache.doris.nereids.operators.plans.physical;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
+import org.apache.doris.nereids.PlanOperatorVisitor;
import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import com.clearspring.analytics.util.Lists;
import org.apache.commons.lang3.StringUtils;
@@ -34,6 +37,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> {
private final List<Long> selectedTabletId;
private final List<Long> selectedPartitionId;
+ private final OlapTable olapTable;
+
/**
* Constructor for PhysicalOlapScan.
*
@@ -41,7 +46,8 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> {
* @param qualifier table's name
*/
public PhysicalOlapScan(OlapTable olapTable, List<String> qualifier) {
- super(OperatorType.PHYSICAL_OLAP_SCAN, olapTable, qualifier);
+ super(OperatorType.PHYSICAL_OLAP_SCAN, qualifier);
+ this.olapTable = olapTable;
this.selectedIndexId = olapTable.getBaseIndexId();
this.selectedTabletId = Lists.newArrayList();
this.selectedPartitionId = olapTable.getPartitionIds();
@@ -62,12 +68,21 @@ public class PhysicalOlapScan extends PhysicalScan<PhysicalOlapScan> {
return selectedPartitionId;
}
+ public OlapTable getTable() {
+ return olapTable;
+ }
+
@Override
public String toString() {
- return "Scan Olap Table " + StringUtils.join(qualifier, ".") + "." + table.getName()
- + " (selected index id: " + selectedTabletId
- + ", selected partition ids: " + selectedPartitionId
- + ", selected tablet ids: " + selectedTabletId
- + ")";
+ return "Scan Olap Table " + StringUtils.join(qualifier, ".") + "." + olapTable.getName()
+ + " (selected index id: " + selectedTabletId + ", selected partition ids: " + selectedPartitionId
+ + ", selected tablet ids: " + selectedTabletId + ")";
}
+
+ @Override
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+ return visitor.visitPhysicalOlapScanPlan(
+ (PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan>) plan, context);
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java
index 4982ec342d..dd5aa79a9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalOperator.java
@@ -28,5 +28,7 @@ import java.util.List;
* interface for all concrete physical operator.
*/
public interface PhysicalOperator<TYPE extends PhysicalOperator<TYPE>> extends PlanOperator<TYPE> {
+
List<Slot> computeOutputs(LogicalProperties logicalProperties, Plan... inputs);
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java
index bd74fc813b..d5046083bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalProject.java
@@ -17,9 +17,11 @@
package org.apache.doris.nereids.operators.plans.physical;
+import org.apache.doris.nereids.PlanOperatorVisitor;
import org.apache.doris.nereids.operators.OperatorType;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.commons.lang3.StringUtils;
@@ -47,4 +49,10 @@ public class PhysicalProject<INPUT_TYPE extends Plan>
public String toString() {
return "Project (" + StringUtils.join(projects, ", ") + ")";
}
+
+ @Override
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+ return visitor.visitPhysicalProject(
+ (PhysicalPlan<? extends PhysicalPlan, PhysicalProject>) plan, context);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java
index 77ce5570b2..10d434c7e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalScan.java
@@ -17,7 +17,6 @@
package org.apache.doris.nereids.operators.plans.physical;
-import org.apache.doris.catalog.Table;
import org.apache.doris.nereids.operators.OperatorType;
import java.util.List;
@@ -26,26 +25,19 @@ import java.util.Objects;
/**
* Abstract class for all physical scan operator.
*/
-public abstract class PhysicalScan<TYPE extends PhysicalScan<TYPE>>
- extends PhysicalLeafOperator<TYPE> {
+public abstract class PhysicalScan<TYPE extends PhysicalScan<TYPE>> extends PhysicalLeafOperator<TYPE> {
+
- protected final Table table;
protected final List<String> qualifier;
/**
* Constructor for PhysicalScan.
*
* @param type node type
- * @param table scan table
* @param qualifier table's name
*/
- public PhysicalScan(OperatorType type, Table table, List<String> qualifier) {
+ public PhysicalScan(OperatorType type, List<String> qualifier) {
super(type);
- this.table = Objects.requireNonNull(table, "table can not be null");
this.qualifier = Objects.requireNonNull(qualifier, "qualifier can not be null");
}
-
- public Table getTable() {
- return table;
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java
new file mode 100644
index 0000000000..1dcb3d6540
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/operators/plans/physical/PhysicalSort.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.operators.plans.physical;
+
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.OperatorType;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+
+import java.util.List;
+
+/**
+ * Physical sort plan operator.
+ */
+public class PhysicalSort extends PhysicalUnaryOperator<PhysicalSort, PhysicalPlan> {
+
+ private final int offset;
+
+ private final int limit;
+
+ private final List<OrderKey> orderList;
+
+ private final boolean useTopN;
+
+ /**
+ * Constructor of PhysicalHashJoinNode.
+ */
+ public PhysicalSort(int offset, int limit, List<OrderKey> orderList, boolean useTopN) {
+ super(OperatorType.PHYSICAL_SORT);
+ this.offset = offset;
+ this.limit = limit;
+ this.orderList = orderList;
+ this.useTopN = useTopN;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public List<OrderKey> getOrderList() {
+ return orderList;
+ }
+
+ public boolean isUseTopN() {
+ return useTopN;
+ }
+
+ public boolean hasLimit() {
+ return limit > -1;
+ }
+
+ @Override
+ public <R, C> R accept(PlanOperatorVisitor<R, C> visitor, Plan<?, ?> plan, C context) {
+ return visitor.visitPhysicalSortPlan((PhysicalPlan<? extends PhysicalPlan, PhysicalSort>) plan,
+ context);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
similarity index 62%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
index fe0db693a5..4122de6901 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
@@ -17,9 +17,27 @@
package org.apache.doris.nereids.properties;
+import org.apache.doris.planner.DataPartition;
+
/**
- * Physical properties used in cascades.
+ * Base class for data distribution.
*/
-public class PhysicalProperties {
+public class DistributionSpec {
+
+ private DataPartition dataPartition;
+
+ public DistributionSpec() {
+ }
+
+ public DistributionSpec(DataPartition dataPartition) {
+ this.dataPartition = dataPartition;
+ }
+
+ public DataPartition getDataPartition() {
+ return dataPartition;
+ }
+ public void setDataPartition(DataPartition dataPartition) {
+ this.dataPartition = dataPartition;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
similarity index 68%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
index fe0db693a5..ade20758ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/HashDistributionSpec.java
@@ -17,9 +17,25 @@
package org.apache.doris.nereids.properties;
+import org.apache.doris.analysis.HashDistributionDesc;
+
/**
- * Physical properties used in cascades.
+ * Describe hash distribution.
*/
-public class PhysicalProperties {
+public class HashDistributionSpec extends DistributionSpec {
+
+ /**
+ * Enums for concrete shuffle type.
+ */
+ public enum ShuffleType {
+ COLOCATE,
+ BUCKET,
+ AGG,
+ NORMAL
+ }
+
+ private ShuffleType shuffleType;
+
+ private HashDistributionDesc hashDistributionDesc;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
similarity index 54%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
index fe0db693a5..b01862dff6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderKey.java
@@ -17,9 +17,39 @@
package org.apache.doris.nereids.properties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
/**
- * Physical properties used in cascades.
+ * Represents the order key of a statement.
*/
-public class PhysicalProperties {
+public class OrderKey {
+
+ private Expression expr;
+
+ private boolean isAsc;
+
+ private boolean nullFirst;
+
+ /**
+ * Constructor of OrderKey.
+ *
+ * @param nullFirst True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
+ */
+ public OrderKey(Expression expr, boolean isAsc, boolean nullFirst) {
+ this.expr = expr;
+ this.isAsc = isAsc;
+ this.nullFirst = nullFirst;
+ }
+
+ public Expression getExpr() {
+ return expr;
+ }
+
+ public boolean isAsc() {
+ return isAsc;
+ }
+ public boolean isNullFirst() {
+ return nullFirst;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index fe0db693a5..abe767c40e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -22,4 +22,13 @@ package org.apache.doris.nereids.properties;
*/
public class PhysicalProperties {
+ private DistributionSpec distributionDesc;
+
+ public DistributionSpec getDistributionDesc() {
+ return distributionDesc;
+ }
+
+ public void setDistributionDesc(DistributionSpec distributionDesc) {
+ this.distributionDesc = distributionDesc;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
similarity index 89%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
index fe0db693a5..f54ed12a19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RandomDistributionDesc.java
@@ -18,8 +18,8 @@
package org.apache.doris.nereids.properties;
/**
- * Physical properties used in cascades.
+ * Describe random distribution.
*/
-public class PhysicalProperties {
+public class RandomDistributionDesc extends DistributionSpec {
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java
similarity index 67%
copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java
index 16d23ffd98..40016180f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ExpressionConverter.java
@@ -17,20 +17,18 @@
package org.apache.doris.nereids.trees.expressions;
-import org.apache.doris.nereids.trees.NodeType;
+import org.apache.doris.analysis.Expr;
/**
- * Abstract class for all slot in expression.
+ * Used to convert expression of new optimizer to stale expr.
*/
-public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpression<EXPR_TYPE>
- implements LeafExpression<EXPR_TYPE> {
+public class ExpressionConverter {
- public Slot(NodeType type) {
- super(type);
- }
+ public static ExpressionConverter converter = new ExpressionConverter();
- @Override
- public Slot toSlot() {
- return this;
+ // TODO: implement this, besides if expression is a slot, should set the slotId to
+ // converted the org.apache.doris.analysis.Expr
+ public Expr convert(Expression expression) {
+ return null;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java
new file mode 100644
index 0000000000..88e4bc6ba5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/FunctionCallExpression.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions;
+
+import org.apache.doris.analysis.FunctionName;
+import org.apache.doris.catalog.Function;
+import org.apache.doris.nereids.trees.NodeType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Temp definition of FunctionCallExpression.
+ */
+public class FunctionCallExpression extends Expression<FunctionCallExpression> {
+
+ private FunctionName functionName;
+
+ private List<Expression> params;
+
+ private Function fn;
+
+ /**
+ * Constructor of FunctionCallExpression.
+ */
+ public FunctionCallExpression(FunctionName functionName,
+ Function fn, Expression... children) {
+ super(NodeType.EXPRESSION, children);
+ this.functionName = functionName;
+ this.params = Arrays.stream(children).collect(Collectors.toList());
+ this.fn = fn;
+ }
+
+ public FunctionName getFunctionName() {
+ return functionName;
+ }
+
+ public List<Expression> getParams() {
+ return params;
+ }
+
+ public Function getFn() {
+ return fn;
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
index 16d23ffd98..3143b807f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Slot.java
@@ -25,6 +25,13 @@ import org.apache.doris.nereids.trees.NodeType;
public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpression<EXPR_TYPE>
implements LeafExpression<EXPR_TYPE> {
+ private int id;
+
+ public Slot(NodeType type, int id, Expression... children) {
+ super(type, children);
+ this.id = id;
+ }
+
public Slot(NodeType type) {
super(type);
}
@@ -33,4 +40,8 @@ public abstract class Slot<EXPR_TYPE extends Slot<EXPR_TYPE>> extends NamedExpre
public Slot toSlot() {
return this;
}
+
+ public int getId() {
+ return id;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
index f779b9eaba..acecad51ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java
@@ -18,7 +18,6 @@
package org.apache.doris.nereids.trees.expressions;
import org.apache.doris.catalog.Column;
-import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.trees.NodeType;
import org.apache.doris.nereids.types.DataType;
@@ -80,12 +79,12 @@ public class SlotReference extends Slot<SlotReference> {
}
@Override
- public DataType getDataType() throws UnboundException {
+ public DataType getDataType() {
return dataType;
}
@Override
- public boolean nullable() throws UnboundException {
+ public boolean nullable() {
return nullable;
}
@@ -123,4 +122,9 @@ public class SlotReference extends Slot<SlotReference> {
public int hashCode() {
return Objects.hash(exprId, name, qualifier, nullable);
}
+
+ // TODO: return real org.apache.doris.catalog.Column
+ public Column getColumn() {
+ return null;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java
new file mode 100644
index 0000000000..3c4384b671
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PhysicalPlanTranslator.java
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.nereids.PlanOperatorVisitor;
+import org.apache.doris.nereids.operators.AbstractOperator;
+import org.apache.doris.nereids.operators.Operator;
+import org.apache.doris.nereids.operators.plans.JoinType;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalAggregation;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalOperator;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.operators.plans.physical.PhysicalSort;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.ExpressionConverter;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.planner.AggregationNode;
+import org.apache.doris.planner.CrossJoinNode;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.SortNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Used to translate to physical plan generated by new optimizer to the plan fragments.
+ */
+@SuppressWarnings("rawtypes")
+public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, PlanContext> {
+
+ public void translatePlan(PhysicalPlan<? extends PhysicalPlan, ? extends AbstractOperator> physicalPlan,
+ PlanContext context) {
+ visit(physicalPlan, context);
+ }
+
+ @Override
+ public PlanFragment visit(Plan<? extends Plan, ? extends Operator> plan, PlanContext context) {
+ PhysicalOperator<?> operator = (PhysicalOperator<?>) plan.getOperator();
+ return operator.accept(this, plan, context);
+ }
+
+ /**
+ * Translate in following steps:
+ * 1.
+ *
+ */
+ @Override
+ public PlanFragment visitPhysicalAggregationPlan(
+ PhysicalPlan<? extends PhysicalPlan, PhysicalAggregation> aggPlan, PlanContext context) {
+
+ PlanFragment inputPlanFragment = visit(
+ (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) aggPlan.child(0), context);
+
+ AggregationNode aggregationNode = null;
+ List<Slot> slotList = aggPlan.getOutput();
+ TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null);
+ PhysicalAggregation physicalAggregation = (PhysicalAggregation) aggPlan.getOperator();
+ AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase().toExec();
+
+ List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList();
+ ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream()
+ .map(e -> ExpressionConverter.converter.convert(e)).collect(Collectors.toCollection(ArrayList::new));
+
+ List<Expression> aggExpressionList = physicalAggregation.getAggExprList();
+ // TODO: agg function could be other expr type either
+ ArrayList<FunctionCallExpr> execAggExpressions = aggExpressionList.stream()
+ .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e))
+ .collect(Collectors.toCollection(ArrayList::new));
+
+ List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList();
+ List<Expr> execPartitionExpressions = partitionExpressionList.stream()
+ .map(e -> (FunctionCallExpr) ExpressionConverter.converter.convert(e)).collect(Collectors.toList());
+ // todo: support DISTINCT
+ AggregateInfo aggInfo = null;
+ switch (phase) {
+ case FIRST:
+ aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+ outputTupleDesc, AggregateInfo.AggPhase.FIRST);
+ aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+ aggregationNode.unsetNeedsFinalize();
+ aggregationNode.setUseStreamingPreagg(physicalAggregation.isUsingStream());
+ aggregationNode.setIntermediateTuple();
+ if (!partitionExpressionList.isEmpty()) {
+ inputPlanFragment.setOutputPartition(DataPartition.hashPartitioned(execPartitionExpressions));
+ }
+ break;
+ case FIRST_MERGE:
+ aggInfo = AggregateInfo.create(execGroupingExpressions, execAggExpressions, outputTupleDesc,
+ outputTupleDesc, AggregateInfo.AggPhase.FIRST_MERGE);
+ aggregationNode = new AggregationNode(context.nextNodeId(), inputPlanFragment.getPlanRoot(), aggInfo);
+ break;
+ default:
+ throw new RuntimeException("Unsupported yet");
+ }
+ inputPlanFragment.setPlanRoot(aggregationNode);
+ return inputPlanFragment;
+ }
+
+ @Override
+ public PlanFragment visitPhysicalOlapScanPlan(
+ PhysicalPlan<? extends PhysicalPlan, PhysicalOlapScan> olapScanPlan, PlanContext context) {
+ // Create OlapScanNode
+ List<Slot> slotList = olapScanPlan.getOutput();
+ PhysicalOlapScan physicalOlapScan = olapScanPlan.getOperator();
+ OlapTable olapTable = physicalOlapScan.getTable();
+ TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, context, olapTable);
+ OlapScanNode olapScanNode = new OlapScanNode(context.nextNodeId(), tupleDescriptor, olapTable.getName());
+ // Create PlanFragment
+ PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), olapScanNode, DataPartition.RANDOM);
+ context.addPlanFragment(planFragment);
+ return planFragment;
+ }
+
+ @Override
+ public PlanFragment visitPhysicalSortPlan(PhysicalPlan<? extends PhysicalPlan, PhysicalSort> sortPlan,
+ PlanContext context) {
+ PlanFragment childFragment = visit(
+ (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) sortPlan.child(0), context);
+ PhysicalSort physicalSort = sortPlan.getOperator();
+ if (!childFragment.isPartitioned()) {
+ return childFragment;
+ }
+ long limit = physicalSort.getLimit();
+
+ List<Expr> execOrderingExprList = Lists.newArrayList();
+ List<Boolean> ascOrderList = Lists.newArrayList();
+ List<Boolean> nullsFirstParamList = Lists.newArrayList();
+
+ List<OrderKey> orderKeyList = physicalSort.getOrderList();
+ orderKeyList.forEach(k -> {
+ execOrderingExprList.add(ExpressionConverter.converter.convert(k.getExpr()));
+ ascOrderList.add(k.isAsc());
+ nullsFirstParamList.add(k.isNullFirst());
+ });
+
+ List<Slot> outputList = sortPlan.getOutput();
+ TupleDescriptor tupleDesc = generateTupleDesc(outputList, context, null);
+ SortInfo sortInfo = new SortInfo(execOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
+
+ PlanNode childNode = childFragment.getPlanRoot();
+ SortNode sortNode = new SortNode(context.nextNodeId(), childNode, sortInfo, physicalSort.isUseTopN(),
+ physicalSort.hasLimit(), physicalSort.getOffset());
+
+ PlanFragment mergeFragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED, context);
+ ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
+ exchNode.unsetLimit();
+ if (physicalSort.hasLimit()) {
+ exchNode.setLimit(limit);
+ }
+ long offset = physicalSort.getOffset();
+ exchNode.setMergeInfo(sortNode.getSortInfo(), offset);
+
+ // Child nodes should not process the offset. If there is a limit,
+ // the child nodes need only return (offset + limit) rows.
+ SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
+ Preconditions.checkState(sortNode == childSortNode);
+ if (sortNode.hasLimit()) {
+ childSortNode.unsetLimit();
+ childSortNode.setLimit(limit + offset);
+ }
+ childSortNode.setOffset(0);
+ return mergeFragment;
+ }
+
+ // TODO: support broadcast join / co-locate / bucket shuffle join later
+ @Override
+ public PlanFragment visitPhysicalHashJoinPlan(
+ PhysicalPlan<? extends PhysicalPlan, PhysicalHashJoin> hashJoinPlan, PlanContext context) {
+ PlanFragment leftFragment = visit(
+ (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) hashJoinPlan.child(0), context);
+ PlanFragment rightFragment = visit(
+ (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) hashJoinPlan.child(0), context);
+ PhysicalHashJoin physicalHashJoin = hashJoinPlan.getOperator();
+ Expression predicateExpr = physicalHashJoin.getPredicate();
+ List<Expression> eqExprList = Utils.getEqConjuncts(hashJoinPlan.child(0).getOutput(),
+ hashJoinPlan.child(1).getOutput(), predicateExpr);
+ JoinType joinType = physicalHashJoin.getJoinType();
+
+ PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
+ PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
+
+ if (joinType.equals(JoinType.CROSS_JOIN)
+ || physicalHashJoin.getJoinType().equals(JoinType.INNER_JOIN) && eqExprList.isEmpty()) {
+ CrossJoinNode crossJoinNode = new CrossJoinNode(context.nextNodeId(), leftFragment.getPlanRoot(),
+ rightFragment.getPlanRoot(), null);
+ crossJoinNode.setLimit(physicalHashJoin.getLimited());
+ List<Expr> conjuncts = Utils.extractConjuncts(predicateExpr).stream()
+ .map(e -> ExpressionConverter.converter.convert(e))
+ .collect(Collectors.toCollection(ArrayList::new));
+ crossJoinNode.addConjuncts(conjuncts);
+ ExchangeNode exchangeNode = new ExchangeNode(context.nextNodeId(), rightFragment.getPlanRoot(), false);
+ exchangeNode.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+ exchangeNode.setFragment(leftFragment);
+ leftFragmentPlanRoot.setChild(1, exchangeNode);
+ rightFragment.setDestination(exchangeNode);
+ crossJoinNode.setChild(0, leftFragment.getPlanRoot());
+ leftFragment.setPlanRoot(crossJoinNode);
+ return leftFragment;
+ }
+
+ List<Expression> expressionList = Utils.extractConjuncts(predicateExpr);
+ expressionList.removeAll(eqExprList);
+ List<Expr> execOtherConjunctList = expressionList.stream().map(e -> ExpressionConverter.converter.convert(e))
+ .collect(Collectors.toCollection(ArrayList::new));
+ List<Expr> execEqConjunctList = eqExprList.stream().map(e -> ExpressionConverter.converter.convert(e))
+ .collect(Collectors.toCollection(ArrayList::new));
+
+ HashJoinNode hashJoinNode = new HashJoinNode(context.nextNodeId(), leftFragmentPlanRoot, rightFragmentPlanRoot,
+ JoinType.toJoinOperator(physicalHashJoin.getJoinType()), execEqConjunctList, execOtherConjunctList);
+
+ ExchangeNode leftExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+ leftExch.setNumInstances(leftFragmentPlanRoot.getNumInstances());
+ ExchangeNode rightExch = new ExchangeNode(context.nextNodeId(), leftFragmentPlanRoot, false);
+ rightExch.setNumInstances(rightFragmentPlanRoot.getNumInstances());
+ hashJoinNode.setChild(0, leftFragmentPlanRoot);
+ hashJoinNode.setChild(1, leftFragmentPlanRoot);
+ hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
+ hashJoinNode.setLimit(physicalHashJoin.getLimited());
+ leftFragment.setDestination((ExchangeNode) rightFragment.getPlanRoot());
+ rightFragment.setDestination((ExchangeNode) leftFragmentPlanRoot);
+ return new PlanFragment(context.nextFragmentId(), hashJoinNode, leftFragment.getDataPartition());
+ }
+
+ @Override
+ public PlanFragment visitPhysicalProject(
+ PhysicalPlan<? extends PhysicalPlan, PhysicalProject> projectPlan, PlanContext context) {
+ return visit((PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) projectPlan.child(0), context);
+ }
+
+ @Override
+ public PlanFragment visitPhysicalFilter(PhysicalPlan<? extends PhysicalPlan, PhysicalFilter> filterPlan,
+ PlanContext context) {
+ PlanFragment inputFragment = visit(
+ (PhysicalPlan<? extends PhysicalPlan, ? extends PhysicalOperator>) filterPlan.child(0), context);
+ PlanNode planNode = inputFragment.getPlanRoot();
+ PhysicalFilter filter = filterPlan.getOperator();
+ Expression expression = filter.getPredicates();
+ List<Expression> expressionList = Utils.extractConjuncts(expression);
+ expressionList.stream().map(ExpressionConverter.converter::convert).forEach(planNode::addConjunct);
+ return inputFragment;
+ }
+
+ private TupleDescriptor generateTupleDesc(List<Slot> slotList, PlanContext context, Table table) {
+ TupleDescriptor tupleDescriptor = context.generateTupleDesc();
+ tupleDescriptor.setTable(table);
+ for (Slot slot : slotList) {
+ SlotReference slotReference = (SlotReference) slot;
+ SlotDescriptor slotDescriptor = context.addSlotDesc(tupleDescriptor, slot.getId());
+ slotDescriptor.setColumn(slotReference.getColumn());
+ slotDescriptor.setType(slotReference.getDataType().toCatalogDataType());
+ slotDescriptor.setIsMaterialized(true);
+ }
+ return tupleDescriptor;
+ }
+
+ private PlanFragment createParentFragment(PlanFragment childFragment, DataPartition parentPartition,
+ PlanContext ctx) {
+ ExchangeNode exchangeNode = new ExchangeNode(ctx.nextNodeId(), childFragment.getPlanRoot(), false);
+ exchangeNode.setNumInstances(childFragment.getPlanRoot().getNumInstances());
+ PlanFragment parentFragment = new PlanFragment(ctx.nextFragmentId(), exchangeNode, parentPartition);
+ childFragment.setDestination(exchangeNode);
+ childFragment.setOutputPartition(parentPartition);
+ return parentFragment;
+ }
+
+ /**
+ * Helper function to eliminate unnecessary checked exception caught requirement from the main logic of translator.
+ *
+ * @param f function which would invoke the logic of
+ * stale code from old optimizer that could throw
+ * a checked exception
+ */
+ public void exec(FuncWrapper f) {
+ try {
+ f.exec();
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected Exception: ", e);
+ }
+ }
+
+ private static interface FuncWrapper {
+ void exec() throws Exception;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
index c359a93ed7..77bf7ac625 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
@@ -44,4 +44,5 @@ public interface Plan<
@Override
Plan child(int index);
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java
new file mode 100644
index 0000000000..fc43e5fa30
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanContext.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans;
+
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNodeId;
+
+import com.clearspring.analytics.util.Lists;
+
+import java.util.List;
+
+/**
+ * Context of physical plan.
+ */
+public class PlanContext {
+ private List<PlanFragment> planFragmentList = Lists.newArrayList();
+
+ private DescriptorTable descTable = new DescriptorTable();
+
+
+ private final IdGenerator<PlanFragmentId> fragmentIdGenerator = PlanFragmentId.createGenerator();
+
+ private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator();
+
+ public List<PlanFragment> getPlanFragmentList() {
+ return planFragmentList;
+ }
+
+ public TupleDescriptor generateTupleDesc() {
+ return descTable.createTupleDescriptor();
+ }
+
+ public PlanNodeId nextNodeId() {
+ return nodeIdGenerator.getNextId();
+ }
+
+ public SlotDescriptor addSlotDesc(TupleDescriptor t) {
+ return descTable.addSlotDescriptor(t);
+ }
+
+ public SlotDescriptor addSlotDesc(TupleDescriptor t, int id) {
+ return descTable.addSlotDescriptor(t, id);
+ }
+
+ public PlanFragmentId nextFragmentId() {
+ return fragmentIdGenerator.getNextId();
+ }
+
+ public void addPlanFragment(PlanFragment planFragment) {
+ this.planFragmentList.add(planFragment);
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java
index ed03a95033..9301c294e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalPlan.java
@@ -35,4 +35,5 @@ public interface PhysicalPlan<
@Override
Plan child(int index);
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
index a01a867593..ff66e62191 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
@@ -17,6 +17,11 @@
package org.apache.doris.nereids.util;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+
+import java.util.List;
+
/**
* Utils for Nereids.
*/
@@ -34,4 +39,14 @@ public class Utils {
return part.replace("`", "``");
}
}
+
+ // TODO: implement later
+ public static List<Expression> getEqConjuncts(List<Slot> left, List<Slot> right, Expression eqExpr) {
+ return null;
+ }
+
+ // TODO: implement later
+ public static List<Expression> extractConjuncts(Expression expr) {
+ return null;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index fb16bdebe6..ac7c7fee96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -103,6 +103,16 @@ public class AggregationNode extends PlanNode {
&& aggInfo.getGroupingExprs().size() > 0;
}
+ // Used by new optimizer
+ public void setNeedsFinalize(boolean needsFinalize) {
+ this.needsFinalize = needsFinalize;
+ }
+
+ // Used by new optimizer
+ public void setUseStreamingPreagg(boolean useStreamingPreagg) {
+ this.useStreamingPreagg = useStreamingPreagg;
+ }
+
@Override
public void setCompactData(boolean on) {
this.compactData = on;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 706a127c0a..0bc86e724a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -67,7 +67,7 @@ import java.util.stream.Collectors;
public class HashJoinNode extends PlanNode {
private final static Logger LOG = LogManager.getLogger(HashJoinNode.class);
- private final TableRef innerRef;
+ private TableRef innerRef;
private final JoinOperator joinOp;
// predicates of the form 'a=b' or 'a<=>b'
private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
@@ -84,8 +84,11 @@ public class HashJoinNode extends PlanNode {
private List<SlotId> hashOutputSlotIds;
- public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef,
- List<Expr> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
+ /**
+ * Constructor of HashJoinNode.
+ */
+ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef, List<Expr> eqJoinConjuncts,
+ List<Expr> otherJoinConjuncts) {
super(id, "HASH JOIN", NodeType.HASH_JOIN_NODE);
Preconditions.checkArgument(eqJoinConjuncts != null && !eqJoinConjuncts.isEmpty());
Preconditions.checkArgument(otherJoinConjuncts != null);
@@ -140,6 +143,63 @@ public class HashJoinNode extends PlanNode {
}
}
+ /**
+ * This constructor is used by new optimizer.
+ */
+ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, JoinOperator joinOp, List<Expr> eqJoinConjuncts,
+ List<Expr> otherJoinConjuncts) {
+ super(id, "HASH JOIN", NodeType.HASH_JOIN_NODE);
+ Preconditions.checkArgument(eqJoinConjuncts != null && !eqJoinConjuncts.isEmpty());
+ Preconditions.checkArgument(otherJoinConjuncts != null);
+ tblRefIds.addAll(outer.getTblRefIds());
+ tblRefIds.addAll(inner.getTblRefIds());
+ this.joinOp = joinOp;
+ // TODO: Support not vec exec engine cut unless tupleid in semi/anti join
+ if (VectorizedUtil.isVectorized()) {
+ if (joinOp.equals(JoinOperator.LEFT_ANTI_JOIN) || joinOp.equals(JoinOperator.LEFT_SEMI_JOIN)
+ || joinOp.equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) {
+ tupleIds.addAll(outer.getTupleIds());
+ } else if (joinOp.equals(JoinOperator.RIGHT_ANTI_JOIN) || joinOp.equals(JoinOperator.RIGHT_SEMI_JOIN)) {
+ tupleIds.addAll(inner.getTupleIds());
+ } else {
+ tupleIds.addAll(outer.getTupleIds());
+ tupleIds.addAll(inner.getTupleIds());
+ }
+ } else {
+ tupleIds.addAll(outer.getTupleIds());
+ tupleIds.addAll(inner.getTupleIds());
+ }
+
+ for (Expr eqJoinPredicate : eqJoinConjuncts) {
+ Preconditions.checkArgument(eqJoinPredicate instanceof BinaryPredicate);
+ BinaryPredicate eqJoin = (BinaryPredicate) eqJoinPredicate;
+ if (eqJoin.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) {
+ Preconditions.checkArgument(eqJoin.getChildren().size() == 2);
+ if (!eqJoin.getChild(0).isNullable() || !eqJoin.getChild(1).isNullable()) {
+ eqJoin.setOp(BinaryPredicate.Operator.EQ);
+ }
+ }
+ this.eqJoinConjuncts.add(eqJoin);
+ }
+ this.distrMode = DistributionMode.NONE;
+ this.otherJoinConjuncts = otherJoinConjuncts;
+ children.add(outer);
+ children.add(inner);
+
+ // Inherits all the nullable tuple from the children
+ // Mark tuples that form the "nullable" side of the outer join as nullable.
+ nullableTupleIds.addAll(inner.getNullableTupleIds());
+ nullableTupleIds.addAll(outer.getNullableTupleIds());
+ if (joinOp.equals(JoinOperator.FULL_OUTER_JOIN)) {
+ nullableTupleIds.addAll(outer.getTupleIds());
+ nullableTupleIds.addAll(inner.getTupleIds());
+ } else if (joinOp.equals(JoinOperator.LEFT_OUTER_JOIN)) {
+ nullableTupleIds.addAll(inner.getTupleIds());
+ } else if (joinOp.equals(JoinOperator.RIGHT_OUTER_JOIN)) {
+ nullableTupleIds.addAll(outer.getTupleIds());
+ }
+ }
+
public List<BinaryPredicate> getEqJoinConjuncts() {
return eqJoinConjuncts;
}
@@ -176,7 +236,7 @@ public class HashJoinNode extends PlanNode {
/**
* Calculate the slots output after going through the hash table in the hash join node.
* The most essential difference between 'hashOutputSlots' and 'outputSlots' is that
- * it's output needs to contain other conjunct and conjunct columns.
+ * it's output needs to contain other conjunct and conjunct columns.
* hash output slots = output slots + conjunct slots + other conjunct slots
* For example:
* select b.k1 from test.t1 a right join test.t1 b on a.k1=b.k1 and b.k2>1 where a.k2>1;
@@ -185,6 +245,7 @@ public class HashJoinNode extends PlanNode {
* conjuncts: b.k2>1
* hash output slots: a.k2, b.k2, b.k1
* eq conjuncts: a.k1=b.k1
+ *
* @param slotIdList
*/
private void initHashOutputSlotIds(List<SlotId> slotIdList) {
@@ -204,8 +265,8 @@ public class HashJoinNode extends PlanNode {
outputSlotIds = Lists.newArrayList();
for (TupleId tupleId : tupleIds) {
for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getSlots()) {
- if (slotDescriptor.isMaterialized()
- && (requiredSlotIdSet == null || requiredSlotIdSet.contains(slotDescriptor.getId()))) {
+ if (slotDescriptor.isMaterialized() && (requiredSlotIdSet == null || requiredSlotIdSet.contains(
+ slotDescriptor.getId()))) {
outputSlotIds.add(slotDescriptor.getId());
}
}
@@ -244,13 +305,11 @@ public class HashJoinNode extends PlanNode {
computeStats(analyzer);
ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap();
- List<Expr> newEqJoinConjuncts =
- Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false);
- eqJoinConjuncts = newEqJoinConjuncts.stream()
- .map(entity -> (BinaryPredicate) entity).collect(Collectors.toList());
+ List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false);
+ eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity)
+ .collect(Collectors.toList());
assignedConjuncts = analyzer.getAssignedConjuncts();
- otherJoinConjuncts =
- Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false);
+ otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false);
}
private void replaceOutputSmapForOuterJoin() {
@@ -286,8 +345,7 @@ public class HashJoinNode extends PlanNode {
private final SlotDescriptor lhs;
private final SlotDescriptor rhs;
- private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs,
- SlotDescriptor rhs) {
+ private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs, SlotDescriptor rhs) {
this.eqJoinConjunct = eqJoinConjunct;
this.lhs = lhs;
this.rhs = rhs;
@@ -359,8 +417,7 @@ public class HashJoinNode extends PlanNode {
*/
public static Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> groupByJoinedTupleIds(
List<EqJoinConjunctScanSlots> eqJoinConjunctSlots) {
- Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids =
- new LinkedHashMap<>();
+ Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids = new LinkedHashMap<>();
for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) {
Pair<TupleId, TupleId> tids = Pair.create(slots.lhsTid(), slots.rhsTid());
List<EqJoinConjunctScanSlots> scanSlots = scanSlotsByJoinedTids.get(tids);
@@ -420,7 +477,8 @@ public class HashJoinNode extends PlanNode {
* - we adjust the NDVs from both sides to account for predicates that may
* might have reduce the cardinality and NDVs
*/
- private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard, long rhsCard) {
+ private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard,
+ long rhsCard) {
Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin());
Preconditions.checkState(!eqJoinConjunctSlots.isEmpty());
Preconditions.checkState(lhsCard >= 0 && rhsCard >= 0);
@@ -531,8 +589,8 @@ public class HashJoinNode extends PlanNode {
// FK/PK join (which doesn't alter the cardinality of the left-hand side)
cardinality = getChild(0).cardinality;
} else {
- cardinality = Math.round((double) getChild(0).cardinality * (double) getChild(
- 1).cardinality / (double) maxNumDistinct);
+ cardinality = Math.round(
+ (double) getChild(0).cardinality * (double) getChild(1).cardinality / (double) maxNumDistinct);
LOG.debug("lhs card: {}, rhs card: {}", getChild(0).cardinality, getChild(1).cardinality);
}
LOG.debug("stats HashJoin: cardinality {}", cardinality);
@@ -585,8 +643,7 @@ public class HashJoinNode extends PlanNode {
// Return -1 if the cardinality of the returned side is unknown.
long cardinality;
- if (joinOp == JoinOperator.RIGHT_SEMI_JOIN
- || joinOp == JoinOperator.RIGHT_ANTI_JOIN) {
+ if (joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == JoinOperator.RIGHT_ANTI_JOIN) {
if (getChild(1).cardinality == -1) {
return -1;
}
@@ -640,8 +697,8 @@ public class HashJoinNode extends PlanNode {
@Override
protected String debugString() {
- return MoreObjects.toStringHelper(this).add("eqJoinConjuncts",
- eqJoinConjunctsDebugString()).addValue(super.debugString()).toString();
+ return MoreObjects.toStringHelper(this).add("eqJoinConjuncts", eqJoinConjunctsDebugString())
+ .addValue(super.debugString()).toString();
}
private String eqJoinConjunctsDebugString() {
@@ -750,10 +807,7 @@ public class HashJoinNode extends PlanNode {
}
public enum DistributionMode {
- NONE("NONE"),
- BROADCAST("BROADCAST"),
- PARTITIONED("PARTITIONED"),
- BUCKET_SHUFFLE("BUCKET_SHUFFLE");
+ NONE("NONE"), BROADCAST("BROADCAST"), PARTITIONED("PARTITIONED"), BUCKET_SHUFFLE("BUCKET_SHUFFLE");
private final String description;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org