You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/08 10:14:38 UTC
[flink] branch master updated: [FLINK-28599][table-planner] Adding FlinkJoinToMultiJoinRule to support translating left/right outer join to multi join
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6886e444c2b [FLINK-28599][table-planner] Adding FlinkJoinToMultiJoinRule to support translating left/right outer join to multi join
6886e444c2b is described below
commit 6886e444c2be97984aa95606120080b8db532feb
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Tue Jul 19 11:07:56 2022 +0800
[FLINK-28599][table-planner] Adding FlinkJoinToMultiJoinRule to support translating left/right outer join to multi join
This closes #20303
---
.../rules/logical/FlinkJoinToMultiJoinRule.java | 555 +++++++++++++++++
.../planner/plan/rules/FlinkBatchRuleSets.scala | 2 +-
.../plan/batch/sql/join/JoinReorderTest.xml | 307 +++++++++-
.../rules/logical/FlinkJoinToMultiJoinRuleTest.xml | 672 ++++++++++++++++++++-
.../plan/rules/logical/JoinToMultiJoinRuleTest.xml | 56 ++
.../plan/stream/sql/join/JoinReorderTest.xml | 291 +++++++++
.../planner/plan/common/JoinReorderTestBase.scala | 88 ++-
.../logical/FlinkJoinToMultiJoinRuleTest.scala | 211 ++++++-
.../runtime/batch/sql/join/JoinITCase.scala | 211 +++++++
9 files changed, 2362 insertions(+), 31 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java
new file mode 100644
index 00000000000..838062dc15c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.rules.FilterMultiJoinMergeRule;
+import org.apache.calcite.rel.rules.MultiJoin;
+import org.apache.calcite.rel.rules.ProjectMultiJoinMergeRule;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink Planner rule to flatten a tree of {@link LogicalJoin}s into a single {@link MultiJoin} with
+ * N inputs.
+ *
+ * <p>This rule is copied from {@link org.apache.calcite.rel.rules.JoinToMultiJoinRule}. In this
+ * rule, we support richer join type to convert to one multi join set, like left outer join and
+ * right outer join, by rewrite $canCombine() method.
+ *
+ * <p>An input is not flattened if the input is a null generating input in an outer join, i.e.,
+ * either input in a full outer join, semi join, anti join, the right side of a left outer join, or
+ * the lef side of a right outer join.
+ *
+ * <p>Join conditions are also pulled up from the inputs into the topmost {@link MultiJoin}.
+ *
+ * <p>Outer join information is also stored in the {@link MultiJoin}. A boolean flag indicates if
+ * the join is a full outer join, and in the case of left and right outer joins, the join type and
+ * outer join conditions are stored in arrays in the {@link MultiJoin}. This outer join information
+ * is associated with the null generating input in the outer join. So, in the case of a left outer
+ * join between A and B, the information is associated with B, not A.
+ *
+ * <p>Here are examples of the {@link MultiJoin}s constructed after this rule has been applied on
+ * following join trees.
+ *
+ * <ul>
+ * <li>A JOIN B → MJ(A, B)
+ * <li>A JOIN B JOIN C → MJ(A, B, C)
+ * <li>A LEFT JOIN B → MJ(A, B)
+ * <li>A RIGHT JOIN B → MJ(A, B)
+ * <li>A FULL JOIN B → MJ[full](A, B)
+ * <li>A LEFT JOIN (B JOIN C) → MJ(A, B, C)
+ * <li>(A JOIN B) LEFT JOIN C → MJ(A, B, C)
+ * <li>(A LEFT JOIN B) JOIN C → MJ(A, B, C)
+ * <li>(A LEFT JOIN B) LEFT JOIN C → MJ(A, B, C)
+ * <li>(A RIGHT JOIN B) RIGHT JOIN C → MJ(MJ(A, B), C)
+ * <li>(A LEFT JOIN B) RIGHT JOIN C → MJ(MJ(A, B), C)
+ * <li>(A RIGHT JOIN B) LEFT JOIN C → MJ(MJ(A, B), C)
+ * <li>A LEFT JOIN (B FULL JOIN C) → MJ(A, MJ[full](B, C))
+ * <li>(A LEFT JOIN B) FULL JOIN (C RIGHT JOIN D) → MJ[full](MJ(A, B), MJ(C, D))
+ * <li>SEMI JOIN and ANTI JOIN not support now.
+ * </ul>
+ *
+ * <p>The constructor is parameterized to allow any sub-class of {@link Join}, not just {@link
+ * LogicalJoin}.
+ *
+ * @see FilterMultiJoinMergeRule
+ * @see ProjectMultiJoinMergeRule
+ * @see CoreRules#JOIN_TO_MULTI_JOIN
+ */
+public class FlinkJoinToMultiJoinRule extends RelRule<FlinkJoinToMultiJoinRule.Config>
+ implements TransformationRule {
+
+ public static final FlinkJoinToMultiJoinRule INSTANCE =
+ FlinkJoinToMultiJoinRule.Config.DEFAULT.toRule();
+
+ /** Creates a JoinToMultiJoinRule. */
+ public FlinkJoinToMultiJoinRule(Config config) {
+ super(config);
+ }
+
+ @Deprecated // to be removed before 2.0
+ public FlinkJoinToMultiJoinRule(Class<? extends Join> clazz) {
+ this(Config.DEFAULT.withOperandFor(clazz));
+ }
+
+ @Deprecated // to be removed before 2.0
+ public FlinkJoinToMultiJoinRule(
+ Class<? extends Join> joinClass, RelBuilderFactory relBuilderFactory) {
+ this(
+ Config.DEFAULT
+ .withRelBuilderFactory(relBuilderFactory)
+ .as(Config.class)
+ .withOperandFor(joinClass));
+ }
+
+ // ~ Methods ----------------------------------------------------------------
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final Join origJoin = call.rel(0);
+ return origJoin.getJoinType().projectsRight();
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Join origJoin = call.rel(0);
+ final RelNode left = call.rel(1);
+ final RelNode right = call.rel(2);
+
+ // Combine the children MultiJoin inputs into an array of inputs for the new MultiJoin.
+ final List<ImmutableBitSet> projFieldsList = new ArrayList<>();
+ final List<int[]> joinFieldRefCountsList = new ArrayList<>();
+ final List<RelNode> newInputs =
+ combineInputs(origJoin, left, right, projFieldsList, joinFieldRefCountsList);
+
+ // Combine the outer join information from the left and right inputs, and include the outer
+ // join information from the current join, if it's a left/right outer join.
+ final List<Pair<JoinRelType, RexNode>> joinSpecs = new ArrayList<>();
+ combineOuterJoins(origJoin, newInputs, left, right, joinSpecs);
+
+ // Pull up the join filters from the children MultiJoinRels and combine them with the join
+ // filter associated with this LogicalJoin to form the join filter for the new MultiJoin.
+ List<RexNode> newJoinFilters = combineJoinFilters(origJoin, left, right);
+
+ // Add on the join field reference counts for the join condition associated with this
+ // LogicalJoin.
+ final com.google.common.collect.ImmutableMap<Integer, ImmutableIntList>
+ newJoinFieldRefCountsMap =
+ addOnJoinFieldRefCounts(
+ newInputs,
+ origJoin.getRowType().getFieldCount(),
+ origJoin.getCondition(),
+ joinFieldRefCountsList);
+
+ List<RexNode> newPostJoinFilters = combinePostJoinFilters(origJoin, left, right);
+
+ final RexBuilder rexBuilder = origJoin.getCluster().getRexBuilder();
+ RelNode multiJoin =
+ new MultiJoin(
+ origJoin.getCluster(),
+ newInputs,
+ RexUtil.composeConjunction(rexBuilder, newJoinFilters),
+ origJoin.getRowType(),
+ origJoin.getJoinType() == JoinRelType.FULL,
+ Pair.right(joinSpecs),
+ Pair.left(joinSpecs),
+ projFieldsList,
+ newJoinFieldRefCountsMap,
+ RexUtil.composeConjunction(rexBuilder, newPostJoinFilters, true));
+
+ call.transformTo(multiJoin);
+ }
+
+ /**
+ * Combines the inputs into a LogicalJoin into an array of inputs.
+ *
+ * @param join original join
+ * @param left left input into join
+ * @param right right input into join
+ * @param projFieldsList returns a list of the new combined projection fields
+ * @param joinFieldRefCountsList returns a list of the new combined join field reference counts
+ * @return combined left and right inputs in an array
+ */
+ private List<RelNode> combineInputs(
+ Join join,
+ RelNode left,
+ RelNode right,
+ List<ImmutableBitSet> projFieldsList,
+ List<int[]> joinFieldRefCountsList) {
+ final List<RelNode> newInputs = new ArrayList<>();
+
+ // Leave the null generating sides of an outer join intact; don't pull up those children
+ // inputs into the array we're constructing.
+ if (canCombine(left, join.getJoinType(), join.getJoinType().generatesNullsOnLeft())) {
+ final MultiJoin leftMultiJoin = (MultiJoin) left;
+ for (int i = 0; i < left.getInputs().size(); i++) {
+ newInputs.add(leftMultiJoin.getInput(i));
+ projFieldsList.add(leftMultiJoin.getProjFields().get(i));
+ joinFieldRefCountsList.add(
+ leftMultiJoin.getJoinFieldRefCountsMap().get(i).toIntArray());
+ }
+ } else {
+ newInputs.add(left);
+ projFieldsList.add(null);
+ joinFieldRefCountsList.add(new int[left.getRowType().getFieldCount()]);
+ }
+
+ if (canCombine(right, join.getJoinType(), join.getJoinType().generatesNullsOnRight())) {
+ final MultiJoin rightMultiJoin = (MultiJoin) right;
+ for (int i = 0; i < right.getInputs().size(); i++) {
+ newInputs.add(rightMultiJoin.getInput(i));
+ projFieldsList.add(rightMultiJoin.getProjFields().get(i));
+ joinFieldRefCountsList.add(
+ rightMultiJoin.getJoinFieldRefCountsMap().get(i).toIntArray());
+ }
+ } else {
+ newInputs.add(right);
+ projFieldsList.add(null);
+ joinFieldRefCountsList.add(new int[right.getRowType().getFieldCount()]);
+ }
+
+ return newInputs;
+ }
+
+ /**
+ * Combines the outer join conditions and join types from the left and right join inputs. If the
+ * join itself is either a left or right outer join, then the join condition corresponding to
+ * the join is also set in the position corresponding to the null-generating input into the
+ * join. The join type is also set.
+ *
+ * @param joinRel join rel
+ * @param combinedInputs the combined inputs to the join
+ * @param left left child of the joinrel
+ * @param right right child of the joinrel
+ * @param joinSpecs the list where the join types and conditions will be copied
+ */
+ private void combineOuterJoins(
+ Join joinRel,
+ List<RelNode> combinedInputs,
+ RelNode left,
+ RelNode right,
+ List<Pair<JoinRelType, RexNode>> joinSpecs) {
+ JoinRelType joinType = joinRel.getJoinType();
+ boolean leftCombined = canCombine(left, joinType, joinType.generatesNullsOnLeft());
+ boolean rightCombined = canCombine(right, joinType, joinType.generatesNullsOnRight());
+ switch (joinType) {
+ case LEFT:
+ if (leftCombined) {
+ copyOuterJoinInfo((MultiJoin) left, joinSpecs, 0, null, null);
+ } else {
+ joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+ }
+ joinSpecs.add(Pair.of(joinType, joinRel.getCondition()));
+ break;
+ case RIGHT:
+ joinSpecs.add(Pair.of(joinType, joinRel.getCondition()));
+ if (rightCombined) {
+ copyOuterJoinInfo(
+ (MultiJoin) right,
+ joinSpecs,
+ left.getRowType().getFieldCount(),
+ right.getRowType().getFieldList(),
+ joinRel.getRowType().getFieldList());
+ } else {
+ joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+ }
+ break;
+ default:
+ if (leftCombined) {
+ copyOuterJoinInfo((MultiJoin) left, joinSpecs, 0, null, null);
+ } else {
+ joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+ }
+ if (rightCombined) {
+ copyOuterJoinInfo(
+ (MultiJoin) right,
+ joinSpecs,
+ left.getRowType().getFieldCount(),
+ right.getRowType().getFieldList(),
+ joinRel.getRowType().getFieldList());
+ } else {
+ joinSpecs.add(Pair.of(JoinRelType.INNER, null));
+ }
+ }
+ }
+
+ /**
+ * Copies outer join data from a source MultiJoin to a new set of arrays. Also adjusts the
+ * conditions to reflect the new position of an input if that input ends up being shifted to the
+ * right.
+ *
+ * @param multiJoin the source MultiJoin
+ * @param destJoinSpecs the list where the join types and conditions will be copied
+ * @param adjustmentAmount if > 0, the amount the RexInputRefs in the join conditions need to
+ * be adjusted by
+ * @param srcFields the source fields that the original join conditions are referencing
+ * @param destFields the destination fields that the new join conditions
+ */
+ private void copyOuterJoinInfo(
+ MultiJoin multiJoin,
+ List<Pair<JoinRelType, RexNode>> destJoinSpecs,
+ int adjustmentAmount,
+ List<RelDataTypeField> srcFields,
+ List<RelDataTypeField> destFields) {
+ final List<Pair<JoinRelType, RexNode>> srcJoinSpecs =
+ Pair.zip(multiJoin.getJoinTypes(), multiJoin.getOuterJoinConditions());
+
+ if (adjustmentAmount == 0) {
+ destJoinSpecs.addAll(srcJoinSpecs);
+ } else {
+ assert srcFields != null;
+ assert destFields != null;
+ int nFields = srcFields.size();
+ int[] adjustments = new int[nFields];
+ for (int idx = 0; idx < nFields; idx++) {
+ adjustments[idx] = adjustmentAmount;
+ }
+ for (Pair<JoinRelType, RexNode> src : srcJoinSpecs) {
+ destJoinSpecs.add(
+ Pair.of(
+ src.left,
+ src.right == null
+ ? null
+ : src.right.accept(
+ new RelOptUtil.RexInputConverter(
+ multiJoin.getCluster().getRexBuilder(),
+ srcFields,
+ destFields,
+ adjustments))));
+ }
+ }
+ }
+
+ /**
+ * Combines the join filters from the left and right inputs (if they are MultiJoinRels) with the
+ * join filter in the joinrel into a single AND'd join filter, unless the inputs correspond to
+ * null generating inputs in an outer join.
+ *
+ * @param join Join
+ * @param left Left input of the join
+ * @param right Right input of the join
+ * @return combined join filters AND-ed together
+ */
+ private List<RexNode> combineJoinFilters(Join join, RelNode left, RelNode right) {
+ JoinRelType joinType = join.getJoinType();
+
+ // AND the join condition if this isn't a left or right outer join; In those cases, the
+ // outer join condition is already tracked separately.
+ final List<RexNode> filters = new ArrayList<>();
+ if ((joinType != JoinRelType.LEFT) && (joinType != JoinRelType.RIGHT)) {
+ filters.add(join.getCondition());
+ }
+ if (canCombine(left, joinType, joinType.generatesNullsOnLeft())) {
+ filters.add(((MultiJoin) left).getJoinFilter());
+ }
+ // Need to adjust the RexInputs of the right child, since those need to shift over to the
+ // right.
+ if (canCombine(right, joinType, joinType.generatesNullsOnRight())) {
+ MultiJoin multiJoin = (MultiJoin) right;
+ filters.add(shiftRightFilter(join, left, multiJoin, multiJoin.getJoinFilter()));
+ }
+
+ return filters;
+ }
+
+ /**
+ * Returns whether an input can be merged into a given relational expression without changing
+ * semantics.
+ *
+ * @param input input into a join
+ * @param nullGenerating true if the input is null generating
+ * @return true if the input can be combined into a parent MultiJoin
+ */
+ private boolean canCombine(RelNode input, JoinRelType joinType, boolean nullGenerating) {
+ if (input instanceof MultiJoin) {
+ MultiJoin join = (MultiJoin) input;
+ if (join.isFullOuterJoin() || nullGenerating) {
+ return false;
+ }
+ for (JoinRelType type : join.getJoinTypes()) {
+ if (type == JoinRelType.FULL) {
+ return false;
+ }
+ // For left/right outer join, if it not meets this condition, it can be converted to
+ // one multi join set.
+ if (joinType != JoinRelType.INNER
+ && ((type == JoinRelType.LEFT || type == JoinRelType.RIGHT)
+ && joinType != type)) {
+ return false;
+ }
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Shifts a filter originating from the right child of the LogicalJoin to the right, to reflect
+ * the filter now being applied on the resulting MultiJoin.
+ *
+ * @param joinRel the original LogicalJoin
+ * @param left the left child of the LogicalJoin
+ * @param right the right child of the LogicalJoin
+ * @param rightFilter the filter originating from the right child
+ * @return the adjusted right filter
+ */
+ private RexNode shiftRightFilter(
+ Join joinRel, RelNode left, MultiJoin right, RexNode rightFilter) {
+ if (rightFilter == null) {
+ return null;
+ }
+
+ int nFieldsOnLeft = left.getRowType().getFieldList().size();
+ int nFieldsOnRight = right.getRowType().getFieldList().size();
+ int[] adjustments = new int[nFieldsOnRight];
+ for (int i = 0; i < nFieldsOnRight; i++) {
+ adjustments[i] = nFieldsOnLeft;
+ }
+ rightFilter =
+ rightFilter.accept(
+ new RelOptUtil.RexInputConverter(
+ joinRel.getCluster().getRexBuilder(),
+ right.getRowType().getFieldList(),
+ joinRel.getRowType().getFieldList(),
+ adjustments));
+ return rightFilter;
+ }
+
+ /**
+ * Adds on to the existing join condition reference counts the references from the new join
+ * condition.
+ *
+ * @param multiJoinInputs inputs into the new MultiJoin
+ * @param nTotalFields total number of fields in the MultiJoin
+ * @param joinCondition the new join condition
+ * @param origJoinFieldRefCounts existing join condition reference counts
+ * @return Map containing the new join condition
+ */
+ private com.google.common.collect.ImmutableMap<Integer, ImmutableIntList>
+ addOnJoinFieldRefCounts(
+ List<RelNode> multiJoinInputs,
+ int nTotalFields,
+ RexNode joinCondition,
+ List<int[]> origJoinFieldRefCounts) {
+ // count the input references in the join condition
+ int[] joinCondRefCounts = new int[nTotalFields];
+ joinCondition.accept(new InputReferenceCounter(joinCondRefCounts));
+
+ // first, make a copy of the ref counters
+ final Map<Integer, int[]> refCountsMap = new HashMap<>();
+ int nInputs = multiJoinInputs.size();
+ int currInput = 0;
+ for (int[] origRefCounts : origJoinFieldRefCounts) {
+ refCountsMap.put(currInput, origRefCounts.clone());
+ currInput++;
+ }
+
+ // add on to the counts for each input into the MultiJoin the
+ // reference counts computed for the current join condition
+ currInput = -1;
+ int startField = 0;
+ int nFields = 0;
+ for (int i = 0; i < nTotalFields; i++) {
+ if (joinCondRefCounts[i] == 0) {
+ continue;
+ }
+ while (i >= (startField + nFields)) {
+ startField += nFields;
+ currInput++;
+ assert currInput < nInputs;
+ nFields = multiJoinInputs.get(currInput).getRowType().getFieldCount();
+ }
+ int[] refCounts = refCountsMap.get(currInput);
+ refCounts[i - startField] += joinCondRefCounts[i];
+ }
+
+ final com.google.common.collect.ImmutableMap.Builder<Integer, ImmutableIntList> builder =
+ com.google.common.collect.ImmutableMap.builder();
+ for (Map.Entry<Integer, int[]> entry : refCountsMap.entrySet()) {
+ builder.put(entry.getKey(), ImmutableIntList.of(entry.getValue()));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Combines the post-join filters from the left and right inputs (if they are MultiJoinRels)
+ * into a single AND'd filter.
+ *
+ * @param joinRel the original LogicalJoin
+ * @param left left child of the LogicalJoin
+ * @param right right child of the LogicalJoin
+ * @return combined post-join filters AND'd together
+ */
+ private List<RexNode> combinePostJoinFilters(Join joinRel, RelNode left, RelNode right) {
+ final List<RexNode> filters = new ArrayList<>();
+ if (right instanceof MultiJoin) {
+ final MultiJoin multiRight = (MultiJoin) right;
+ filters.add(
+ shiftRightFilter(joinRel, left, multiRight, multiRight.getPostJoinFilter()));
+ }
+
+ if (left instanceof MultiJoin) {
+ filters.add(((MultiJoin) left).getPostJoinFilter());
+ }
+
+ return filters;
+ }
+
+ // ~ Inner Classes ----------------------------------------------------------
+
+ /** Visitor that keeps a reference count of the inputs used by an expression. */
+ private static class InputReferenceCounter extends RexVisitorImpl<Void> {
+ private final int[] refCounts;
+
+ InputReferenceCounter(int[] refCounts) {
+ super(true);
+ this.refCounts = refCounts;
+ }
+
+ public Void visitInputRef(RexInputRef inputRef) {
+ refCounts[inputRef.getIndex()]++;
+ return null;
+ }
+ }
+
+ /** Rule configuration. */
+ public interface Config extends RelRule.Config {
+ Config DEFAULT = EMPTY.as(Config.class).withOperandFor(LogicalJoin.class);
+
+ @Override
+ default FlinkJoinToMultiJoinRule toRule() {
+ return new FlinkJoinToMultiJoinRule(this);
+ }
+
+ /** Defines an operand tree for the given classes. */
+ default Config withOperandFor(Class<? extends Join> joinClass) {
+ return withOperandSupplier(
+ b0 ->
+ b0.operand(joinClass)
+ .inputs(
+ b1 -> b1.operand(RelNode.class).anyInputs(),
+ b2 -> b2.operand(RelNode.class).anyInputs()))
+ .as(Config.class);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index e8e0064741c..6513355e61b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -214,7 +214,7 @@ object FlinkBatchRuleSets {
val JOIN_REORDER_PREPARE_RULES: RuleSet = RuleSets.ofList(
// merge join to MultiJoin
- CoreRules.JOIN_TO_MULTI_JOIN,
+ FlinkJoinToMultiJoinRule.INSTANCE,
// merge project to MultiJoin
CoreRules.PROJECT_MULTI_JOIN_MERGE,
// merge filter to MultiJoin
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
index 464efd2bd13..dc25d68b66f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/JoinReorderTest.xml
@@ -87,18 +87,19 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-HashJoin(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right])
-:- Exchange(distribution=[hash[a4]])
-: +- MultipleInput(readOrder=[0,0,1,0], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[LeftOuterJoin], where=[(a2 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])\n: :- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])\n: : :- [#3] LegacyTableS [...]
-: :- Exchange(distribution=[broadcast])
-: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
-: :- Exchange(distribution=[broadcast])
-: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
-: :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
-: +- Exchange(distribution=[broadcast])
-: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
-+- Exchange(distribution=[hash[a5]])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a1, b1, c1, a2, b2, c2, a4, b4, c4, a3, b3, c3, a5, b5, c5], build=[right])\n:- HashJoin(joinType=[LeftOuterJoin], where=[(a2 = a3)], select=[a1, b1, c1, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n: :- [#2] Exchange(distribution=[hash[a4]])\n: +- [#3] Exchange(distribution=[broadcast])\n+- [#1] Exchange(distribution=[hash[a5]])\n])
+ :- Exchange(distribution=[hash[a5]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ :- Exchange(distribution=[hash[a4]])
+ : +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])\n: :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])\n: +- [#3] Exchange(distribution=[b [...]
+ : :- Exchange(distribution=[broadcast])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ : :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[broadcast])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[broadcast])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
]]>
</Resource>
</TestCase>
@@ -373,18 +374,196 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
-+- MultipleInput(readOrder=[0,0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])\n: :- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])\n: : :- [#3] Exchange(distribut [...]
++- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])\n: :- [#2] Exchange(distribution=[hash[a1]])\n: +- [#3] Exchange(distribution=[hash[a2]])\n+- [#1] Exchange(distribution=[broadcast])\n])
+ :- Exchange(distribution=[broadcast])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ :- Exchange(distribution=[hash[a1]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ +- Exchange(distribution=[hash[a2]])
+ +- HashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[broadcast])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinAntiJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ WHERE NOT EXISTS (SELECT a5 FROM T5 WHERE a1 = a5)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a1, $0)])
+ LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a2 = a4)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], build=[right])\n: :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])\n: +- [#3] Exchange(distribution=[broad [...]
:- Exchange(distribution=[broadcast])
- : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[broadcast])
+ +- MultipleInput(readOrder=[0,1,0], members=[\nNestedLoopJoin(joinType=[LeftAntiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n: :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])\n: +- [#3] Exchange(distribution=[broadcast])\n+- [#1] Exchange(di [...]
+ :- Exchange(distribution=[broadcast])
+ : +- LocalHashAggregate(groupBy=[a5], select=[a5])
+ : +- Calc(select=[a5])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ +- Exchange(distribution=[broadcast])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ LEFT OUTER JOIN T3 ON a1 = a3
+ JOIN T4 ON a1 = a4
+ LEFT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[broadcast])
+ +- MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[RightOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a4, b4, c4, a3, b3, c3], build=[right])\n:- [#1] Exchange(distribution=[hash[a5]])\n+- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right])\n :- [#2] Exchange(distribution=[hash[a4]])\n +- [#3] Exchange(distribution=[broadcast])\n])
+ :- Exchange(distribution=[hash[a5]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ :- Exchange(distribution=[hash[a4]])
+ : +- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a4, b4, c4], isBroadcast=[true], build=[right])
+ : :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[broadcast])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[broadcast])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinRightOuterJoinInnerJoinRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ RIGHT OUTER JOIN T3 ON a1 = a3
+ JOIN T4 ON a1 = a4
+ RIGHT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[right])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+ +- HashJoin(joinType=[InnerJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[broadcast])
+ +- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[left])
+ :- Exchange(distribution=[broadcast])
+ : +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right])
+ : :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[broadcast])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinSemiJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ WHERE a1 IN (SELECT a5 FROM T5)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a5=[$0])
+ LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+})])
+ +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a2 = a4)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a2 = a3)], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n: :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])\n: +- [#3] Exchange(distr [...]
:- Exchange(distribution=[broadcast])
- : +- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right])
- : :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
- : +- Exchange(distribution=[broadcast])
- : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
- :- Exchange(distribution=[hash[a1]])
- : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
- +- Exchange(distribution=[hash[a2]])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[broadcast])
+ +- MultipleInput(readOrder=[0,1,0], members=[\nNestedLoopJoin(joinType=[LeftSemiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], build=[right])\n:- HashJoin(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n: :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])\n: +- [#3] Exchange(distribution=[broadcast])\n+- [#1] Exchange(di [...]
+ :- Exchange(distribution=[broadcast])
+ : +- LocalHashAggregate(groupBy=[a5], select=[a5])
+ : +- Calc(select=[a5])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ +- Exchange(distribution=[broadcast])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
]]>
</Resource>
</TestCase>
@@ -484,6 +663,90 @@ Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
+- Exchange(distribution=[broadcast])
+- Calc(select=[a2, c2])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ LEFT OUTER JOIN T2 ON a1 = a2
+ JOIN T3 ON a1 = a3
+ LEFT OUTER JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a2, b2, c2, a5, b5, c5, a4, b4, c4, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a4, b4, c4, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTab [...]
+ :- Exchange(distribution=[broadcast])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[RightOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[left])\n:- [#1] Exchange(distribution=[broadcast])\n+- HashJoin(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right])\n :- [#2] LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c [...]
+ :- Exchange(distribution=[broadcast])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ +- Exchange(distribution=[broadcast])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinInnerJoinRightOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ RIGHT OUTER JOIN T2 ON a1 = a2
+ JOIN T3 ON a1 = a3
+ RIGHT OUTER JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- HashJoin(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[broadcast])
+ +- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[broadcast])
+ +- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right])\n:- HashJoin(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], build=[right])\n: :- [#2] Exchange(distribution=[hash[a1]])\n: +- [#3] Exchange(distribution=[hash[a2]])\n+- [#1] Exchange(distribution=[broadcast])\n])
+ :- Exchange(distribution=[broadcast])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ :- Exchange(distribution=[hash[a1]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ +- Exchange(distribution=[hash[a2]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
index d0aa091f00b..e270c6e7f75 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml
@@ -16,7 +16,140 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testDoesNotMatchAntiJoin">
+ <TestCase name="testFullOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c, T3 WHERE a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($0, $4)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[=($0, $4)])
+ +- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+ :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFullOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFullOuterJoinLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $4)]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFullOuterJoinRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFullOuterJoinSemiJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$0])
+ LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+})])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[semi])
+ :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinAntiJoin">
<Resource name="sql">
<![CDATA[
SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t
@@ -49,7 +182,195 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
]]>
</Resource>
</TestCase>
- <TestCase name="testDoesNotMatchSemiJoin">
+ <TestCase name="testInnerJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1, T2, T3 WHERE a = c AND a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($0, $4))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($0, $4))])
+ +- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[ALL, ALL, ALL]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinAntiJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t
+WHERE NOT EXISTS (SELECT e FROM T3 WHERE a = e)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[anti])
+ :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 JOIN T2 ON a =c LEFT OUTER JOIN T3 ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($0, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1 JOIN T2 ON a = c LEFT OUTER JOIN
+(SELECT * FROM T3) ON a = e JOIN
+(SELECT * FROM T4) ON a = g LEFT OUTER JOIN
+(SELECT * FROM T5) ON a = i
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+ : : :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ : : +- LogicalProject(e=[$0], f=[$1])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+ : +- LogicalProject(g=[$0], h=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+ +- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[AND(=($0, $6), =($0, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($0, $4), NULL, =($0, $8)]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:- LogicalProject(e=[$0], f=[$1])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalProject(g=[$0], h=[$1])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 JOIN T2 ON a =c RIGHT OUTER JOIN T3 ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinRightOuterJoinInnerJoinRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1 JOIN T2 ON a = c RIGHT OUTER JOIN
+(SELECT * FROM T3) ON a = e JOIN
+(SELECT * FROM T4) ON a = g RIGHT OUTER JOIN
+(SELECT * FROM T5) ON a = i
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+ : : :- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ : : +- LogicalProject(e=[$0], f=[$1])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+ : +- LogicalProject(g=[$0], h=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+ +- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $8), NULL]], projFields=[[{0, 1, 2, 3, 4, 5, 6, 7}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $6)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $4), NULL, NULL]], projFields=[[ALL, ALL, ALL]])
+: :- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]])
+: : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+: :- LogicalProject(e=[$0], f=[$1])
+: : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+: +- LogicalProject(g=[$0], h=[$1])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinSemiJoin">
<Resource name="sql">
<![CDATA[SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t WHERE a IN (SELECT e FROM T3)]]>
</Resource>
@@ -75,6 +396,353 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+- LogicalProject(e=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinAntiJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c) t
+WHERE NOT EXISTS (SELECT e FROM T3 WHERE a = e)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[anti])
+ :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[{0, 1}, {0, 1}]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN
+(SELECT * FROM T3) ON a = e LEFT OUTER JOIN
+(SELECT * FROM T4) ON a = g JOIN
+(SELECT * FROM T5) ON a = i
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+ : :- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ : : +- LogicalProject(e=[$0], f=[$1])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+ : +- LogicalProject(g=[$0], h=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+ +- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[AND(=($0, $8), =($0, $4))], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, INNER, LEFT, INNER]], outerJoinConditions=[[NULL, =($0, $2), NULL, =($0, $6), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+:- LogicalProject(e=[$0], f=[$1])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalProject(g=[$0], h=[$1])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, LEFT]], outerJoinConditions=[[NULL, =($0, $2), =($0, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinSemiJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$0])
+ LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+})])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[semi])
+ :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[left])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT, INNER]], outerJoinConditions=[[NULL, =($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinInnerJoinRightOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN
+(SELECT * FROM T3) ON a = e RIGHT OUTER JOIN
+(SELECT * FROM T4) ON a = g JOIN
+(SELECT * FROM T5) ON a = i
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7], i=[$8], j=[$9])
++- LogicalJoin(condition=[=($0, $8)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $6)], joinType=[right])
+ : :- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ : : +- LogicalProject(e=[$0], f=[$1])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+ : +- LogicalProject(g=[$0], h=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
+ +- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $8)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $6), NULL, NULL]], projFields=[[{0, 1, 2, 3, 4, 5}, {0, 1}, {0, 1}]])
+:- MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $2), NULL, NULL]], projFields=[[ALL, ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+: +- LogicalProject(e=[$0], f=[$1])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalProject(g=[$0], h=[$1])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(g, h)]]])
++- LogicalProject(i=[$0], j=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(i, j)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $4)]], projFields=[[{0, 1, 2, 3}, {0, 1}]])
+:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+: :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinSemiJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$0])
+ LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+})])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
++- LogicalJoin(condition=[=($0, $4)], joinType=[semi])
+ :- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER, INNER]], outerJoinConditions=[[=($0, $2), NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
++- LogicalProject(e=[$0], f=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSubRightOuterJoinQuery">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.a = T3.e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(e=[$0], f=[$1], a=[$2], b=[$3], c=[$4], d=[$5])
++- LogicalJoin(condition=[=($2, $0)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[right])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, RIGHT, INNER]], outerJoinConditions=[[=($2, $0), =($2, $4), NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+:- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRuleTest.xml
new file mode 100644
index 00000000000..bd6e95faf3d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRuleTest.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testLeftOuterJoinRightOuterJoinToMultiJoin">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8])
++- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
+ +- LogicalProject(a3=[$0], b3=[$1], c3=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, t3]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+:- Exchange(distribution=[hash[a1]])
+: +- Calc(select=[a1, b1, c1, a2, b2, c2])
+: +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a2, b2, c2, a1, b1, c1], build=[right])
+: :- Exchange(distribution=[hash[a2]])
+: : +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[a2, b2, c2])
+: +- Exchange(distribution=[hash[a1]])
+: +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a1, b1, c1])
++- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog, default_database, t3]], fields=[a3, b3, c3])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right])
+:- Exchange(distribution=[hash[a1]])
+: +- Calc(select=[a1, b1, c1, a2, b2, c2])
+: +- HashJoin(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a2, b2, c2, a1, b1, c1], build=[right])
+: :- Exchange(distribution=[hash[a2]])
+: : +- TableSourceScan(table=[[default_catalog, default_database, t2]], fields=[a2, b2, c2])
+: +- Exchange(distribution=[hash[a1]])
+: +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a1, b1, c1])
++- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog, default_database, t3]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml
index 01774ba333c..e59941b5a7c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinReorderTest.xml
@@ -416,6 +416,201 @@ Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+- Exchange(distribution=[hash[a3]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinAntiJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ WHERE NOT EXISTS (SELECT a5 FROM T5 WHERE a1 = a5)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a1, $0)])
+ LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- Join(joinType=[InnerJoin], where=[((a1 = a4) AND (a4 = a2) AND (a3 = a4))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2, a3]])
+ : +- Join(joinType=[InnerJoin], where=[((a2 = a3) AND (a1 = a2))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2, a2]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3, a1]])
+ : +- Join(joinType=[LeftAntiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- Join(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a3]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ : +- Exchange(distribution=[hash[a5]])
+ : +- Calc(select=[a5])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4, a4, a4]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ LEFT OUTER JOIN T3 ON a1 = a3
+ JOIN T4 ON a1 = a4
+ LEFT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[left])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[left])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[RightOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- Join(joinType=[InnerJoin], where=[(a1 = a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Join(joinType=[LeftOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a2]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ +- Exchange(distribution=[hash[a4]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinRightOuterJoinInnerJoinRightOuterJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ RIGHT OUTER JOIN T3 ON a1 = a3
+ JOIN T4 ON a1 = a4
+ RIGHT OUTER JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[right])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[right])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[LeftOuterJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+ +- Join(joinType=[InnerJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a4]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[hash[a1]])
+ +- Join(joinType=[RightOuterJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[a2]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[hash[a3]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinSemiJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ JOIN T2 ON a1 = a2
+ JOIN T3 ON a2 = a3
+ JOIN T4 ON a1 = a4
+ WHERE a1 IN (SELECT a5 FROM T5)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a5=[$0])
+ LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+})])
+ +- LogicalJoin(condition=[=($0, $9)], joinType=[inner])
+ :- LogicalJoin(condition=[=($3, $6)], joinType=[inner])
+ : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
++- Join(joinType=[InnerJoin], where=[((a1 = a4) AND (a4 = a2) AND (a3 = a4))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2, a3]])
+ : +- Join(joinType=[InnerJoin], where=[((a2 = a3) AND (a1 = a2))], select=[a2, b2, c2, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a2, a2]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ : +- Exchange(distribution=[hash[a3, a1]])
+ : +- Join(joinType=[LeftSemiJoin], where=[(a1 = a5)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- Join(joinType=[InnerJoin], where=[(a3 = a1)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : : :- Exchange(distribution=[hash[a1]])
+ : : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : : +- Exchange(distribution=[hash[a3]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+ : +- Exchange(distribution=[hash[a5]])
+ : +- Calc(select=[a5])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4, a4, a4]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
]]>
</Resource>
</TestCase>
@@ -528,6 +723,102 @@ Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5])
+- Exchange(distribution=[hash[a4]])
+- Calc(select=[a4, b4])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ LEFT OUTER JOIN T2 ON a1 = a2
+ JOIN T3 ON a1 = a3
+ LEFT OUTER JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[left])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+ +- Join(joinType=[RightOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a4]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[hash[a1]])
+ +- Join(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Join(joinType=[LeftOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[a2]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[hash[a3]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightOuterJoinInnerJoinRightOuterJoinInnerJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM T1
+ RIGHT OUTER JOIN T2 ON a1 = a2
+ JOIN T3 ON a1 = a3
+ RIGHT OUTER JOIN T4 ON a1 = a4
+ JOIN T5 ON a4 = a5
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14])
++- LogicalJoin(condition=[=($9, $12)], joinType=[inner])
+ :- LogicalJoin(condition=[=($0, $9)], joinType=[right])
+ : :- LogicalJoin(condition=[=($0, $6)], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right])
+ : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]])
+ : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5])
++- Join(joinType=[InnerJoin], where=[(a4 = a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a5]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5])
+ +- Exchange(distribution=[hash[a4]])
+ +- Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4])
+ +- Join(joinType=[LeftOuterJoin], where=[(a1 = a4)], select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a4]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4])
+ +- Exchange(distribution=[hash[a1]])
+ +- Join(joinType=[InnerJoin], where=[(a1 = a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Join(joinType=[RightOuterJoin], where=[(a1 = a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ : :- Exchange(distribution=[hash[a1]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1])
+ : +- Exchange(distribution=[hash[a2]])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2])
+ +- Exchange(distribution=[hash[a3]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
index 692369198e8..ac87bc265ac 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/JoinReorderTestBase.scala
@@ -210,7 +210,7 @@ abstract class JoinReorderTestBase extends TableTestBase {
| LEFT OUTER JOIN T4 ON a1 = a4
| JOIN T5 ON a4 = a5
""".stripMargin
- // T1, T2, T3 can reorder
+ // T1, T2, T3 T4 T5 can reorder.
util.verifyExecPlan(sql)
}
@@ -251,7 +251,7 @@ abstract class JoinReorderTestBase extends TableTestBase {
| LEFT OUTER JOIN T4 ON a1 = a4
| LEFT OUTER JOIN T5 ON a4 = a5
""".stripMargin
- // can not reorder
+ // can reorder. Left outer join will be converted to one multi set by FlinkJoinToMultiJoinRule.
util.verifyExecPlan(sql)
}
@@ -283,6 +283,90 @@ abstract class JoinReorderTestBase extends TableTestBase {
util.verifyExecPlan(sql)
}
+ @Test
+ def testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | JOIN T2 ON a1 = a2
+ | LEFT OUTER JOIN T3 ON a1 = a3
+ | JOIN T4 ON a1 = a4
+ | LEFT OUTER JOIN T5 ON a4 = a5
+ """.stripMargin
+ // T1, T2, T3, T4, T5 can reorder.
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | LEFT OUTER JOIN T2 ON a1 = a2
+ | JOIN T3 ON a1 = a3
+ | LEFT OUTER JOIN T4 ON a1 = a4
+ | JOIN T5 ON a4 = a5
+ """.stripMargin
+ // T1, T2, T3, T4, T5 can reorder.
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testInnerJoinRightOuterJoinInnerJoinRightOuterJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | JOIN T2 ON a1 = a2
+ | RIGHT OUTER JOIN T3 ON a1 = a3
+ | JOIN T4 ON a1 = a4
+ | RIGHT OUTER JOIN T5 ON a4 = a5
+ """.stripMargin
+ // T1 and T2 can not reorder, but MJ(T1, T2), T3, T4 can reorder.
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testRightOuterJoinInnerJoinRightOuterJoinInnerJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | RIGHT OUTER JOIN T2 ON a1 = a2
+ | JOIN T3 ON a1 = a3
+ | RIGHT OUTER JOIN T4 ON a1 = a4
+ | JOIN T5 ON a4 = a5
+ """.stripMargin
+ // T1, T2, T3 can reorder, and MJ(T1, T2, T3), T4, T5 can reorder.
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testInnerJoinSemiJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | JOIN T2 ON a1 = a2
+ | JOIN T3 ON a2 = a3
+ | JOIN T4 ON a1 = a4
+ | WHERE a1 IN (SELECT a5 FROM T5)
+ """.stripMargin
+ // can not reorder. Semi join will support join order in future.
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testInnerJoinAntiJoin(): Unit = {
+ val sql =
+ s"""
+ |SELECT * FROM T1
+ | JOIN T2 ON a1 = a2
+ | JOIN T3 ON a2 = a3
+ | JOIN T4 ON a1 = a4
+ | WHERE NOT EXISTS (SELECT a5 FROM T5 WHERE a1 = a5)
+ """.stripMargin
+ // can not reorder
+ util.verifyExecPlan(sql)
+ }
+
@Test
def testDeriveNullFilterAfterJoinReorder(): Unit = {
val types = Array[TypeInformation[_]](Types.INT, Types.LONG)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
index e36fe7a5683..432e40ad989 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.rules.CoreRules
import org.apache.calcite.tools.RuleSets
import org.junit.{Before, Test}
-/** Tests for [[org.apache.calcite.rel.rules.JoinToMultiJoinRule]]. */
+/** Tests for [[org.apache.flink.table.planner.plan.rules.logical.FlinkJoinToMultiJoinRule]]. */
class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
private val util = batchTestUtil()
@@ -40,7 +40,7 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(RuleSets.ofList(CoreRules.JOIN_TO_MULTI_JOIN, CoreRules.PROJECT_MULTI_JOIN_MERGE))
+ .add(RuleSets.ofList(FlinkJoinToMultiJoinRule.INSTANCE, CoreRules.PROJECT_MULTI_JOIN_MERGE))
.build()
)
@@ -50,14 +50,136 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
}
@Test
- def testDoesNotMatchSemiJoin(): Unit = {
+ def testInnerJoinInnerJoin(): Unit = {
+ // Can translate join to multi join.
+ val sqlQuery = "SELECT * FROM T1, T2, T3 WHERE a = c AND a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testInnerJoinLeftOuterJoin(): Unit = {
+ val sqlQuery = "SELECT * FROM T1 JOIN T2 ON a =c LEFT OUTER JOIN T3 ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testInnerJoinRightOuterJoin(): Unit = {
+ // Cannot translate to one multi join set because right outer join left will generate null.
+ val sqlQuery = "SELECT * FROM T1 JOIN T2 ON a =c RIGHT OUTER JOIN T3 ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testLeftOuterJoinLeftOuterJoin(): Unit = {
+ // Can translate join to multi join.
+ val sqlQuery =
+ "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testLeftOuterJoinRightOuterJoin(): Unit = {
+ // Cannot translate join to multi join.
+ val sqlQuery =
+ "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testLeftOuterJoinInnerJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testRightOuterJoinRightOuterJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testSubRightOuterJoinQuery(): Unit = {
+ // This case will be set into one multi join set.
+ val sqlQuery =
+ "SELECT * FROM T3 RIGHT OUTER JOIN (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t ON t.a = T3.e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testRightOuterJoinLeftOuterJoin(): Unit = {
+ // Cannot not translate join to multi join because right outer join in join left.
+ val sqlQuery =
+ "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testRightOuterJoinInnerJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testFullOuterJoin(): Unit = {
+ // Cannot translate join to multi join.
+ val sqlQuery = "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c, T3 WHERE a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testFullOuterJoinInnerJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testFullOuterJoinLeftOuterJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c LEFT OUTER JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testFullOuterJoinRightOuterJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c RIGHT OUTER JOIN (SELECT * FROM T3) ON a = e"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testFullOuterJoinSemiJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 FULL OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testInnerJoinSemiJoin(): Unit = {
val sqlQuery =
"SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t WHERE a IN (SELECT e FROM T3)"
util.verifyRelPlan(sqlQuery)
}
@Test
- def testDoesNotMatchAntiJoin(): Unit = {
+ def testLeftOuterJoinSemiJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testRightOuterJoinSemiJoin(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c WHERE a IN (SELECT e FROM T3)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testInnerJoinAntiJoin(): Unit = {
val sqlQuery =
"""
|SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) t
@@ -65,4 +187,85 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
""".stripMargin
util.verifyRelPlan(sqlQuery)
}
+
+ @Test
+ def testLeftOuterJoinAntiJoin(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c) t
+ |WHERE NOT EXISTS (SELECT e FROM T3 WHERE a = e)
+ """.stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testRightOuterJoinAntiJoin(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c) t
+ |WHERE NOT EXISTS (SELECT e FROM T3 WHERE a = e)
+ """.stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testInnerJoinLeftOuterJoinInnerJoinLeftOuterJoin(): Unit = {
+ util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+ util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+ val sqlQuery =
+ """
+ |SELECT * FROM T1 JOIN T2 ON a = c LEFT OUTER JOIN
+ |(SELECT * FROM T3) ON a = e JOIN
+ |(SELECT * FROM T4) ON a = g LEFT OUTER JOIN
+ |(SELECT * FROM T5) ON a = i
+ """.stripMargin
+ util.verifyRelPlan(sqlQuery)
+
+ }
+
+ @Test
+ def testLeftOuterJoinInnerJoinLeftOuterJoinInnerJoin(): Unit = {
+ util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+ util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+ val sqlQuery =
+ """
+ |SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c JOIN
+ |(SELECT * FROM T3) ON a = e LEFT OUTER JOIN
+ |(SELECT * FROM T4) ON a = g JOIN
+ |(SELECT * FROM T5) ON a = i
+ """.stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testInnerJoinRightOuterJoinInnerJoinRightOuterJoin(): Unit = {
+ util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+ util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+ val sqlQuery =
+ """
+ |SELECT * FROM T1 JOIN T2 ON a = c RIGHT OUTER JOIN
+ |(SELECT * FROM T3) ON a = e JOIN
+ |(SELECT * FROM T4) ON a = g RIGHT OUTER JOIN
+ |(SELECT * FROM T5) ON a = i
+ """.stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testRightOuterJoinInnerJoinRightOuterJoinInnerJoin(): Unit = {
+ util.addTableSource[(Int, Long)]("T4", 'g, 'h)
+ util.addTableSource[(Int, Long)]("T5", 'i, 'j)
+
+ val sqlQuery =
+ """
+ |SELECT * FROM T1 RIGHT OUTER JOIN T2 ON a = c JOIN
+ |(SELECT * FROM T3) ON a = e RIGHT OUTER JOIN
+ |(SELECT * FROM T4) ON a = g JOIN
+ |(SELECT * FROM T5) ON a = i
+ """.stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
index 62a06e99d45..0c954918a29 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
@@ -24,9 +24,12 @@ import org.apache.flink.api.common.typeutils.TypeComparator
import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
import org.apache.flink.streaming.api.transformations.{LegacySinkTransformation, OneInputTransformation, TwoInputTransformation}
+import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.api.internal.{StatementSetImpl, TableEnvironmentInternal}
+import org.apache.flink.table.plan.stats.TableStats
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.expressions.utils.FuncWithOpen
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.runtime.batch.sql.join.JoinType.{BroadcastHashJoin, HashJoin, JoinType, NestedLoopJoin, SortMergeJoin}
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -770,6 +773,214 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
)
}
+ @Test
+ def testLeftOuterJoinReorder(): Unit = {
+ // This test is used to test the result after join to multi join and join reorder.
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+ Boolean.box(true))
+ // Register table with stats to support join reorder,
+ // join order LJ(LJ(LJ(T5, T3), T2), T1) will reorder to RJ(T1, LJ(T2, LJ(T5, T3)))
+ registerCollection(
+ "T5",
+ data5,
+ type5,
+ "d, e, f, g, h",
+ nullablesOfData5,
+ new FlinkStatistic(new TableStats(1000L)))
+ registerCollection(
+ "T3",
+ smallData3,
+ type3,
+ "a, b, c",
+ nullablesOfSmallData3,
+ new FlinkStatistic(new TableStats(100L)))
+ registerCollection(
+ "T2",
+ data2,
+ type2,
+ "d, e, f, g, h",
+ nullablesOfData2,
+ new FlinkStatistic(new TableStats(10L)))
+ registerCollection(
+ "T1",
+ data2,
+ type2,
+ "d, e, f, g, h",
+ nullablesOfData2,
+ new FlinkStatistic(new TableStats(100000L)))
+
+ checkResult(
+ """
+ |SELECT T5.g, T3b.c, T2b.g FROM T5 LEFT OUTER JOIN
+ |(SELECT * FROM T3 WHERE a > 0 ) T3b ON T3b.b = T5.e LEFT OUTER JOIN
+ |(SELECT * FROM T2 WHERE d > 0) T2b ON T3b.b = T2b.e LEFT OUTER JOIN
+ |(SELECT * FROM T1) T1b ON T3b.b = T1b.e
+ |""".stripMargin,
+ Seq(
+ row("Hallo", "Hi", "Hallo"),
+ row("Hallo Welt", "Hello world", "Hallo Welt"),
+ row("Hallo Welt", "Hello", "Hallo Welt"),
+ row("Hallo Welt wie gehts?", null, null),
+ row("Hallo Welt wie", null, null),
+ row("ABC", null, null),
+ row("BCD", null, null),
+ row("CDE", null, null),
+ row("DEF", null, null),
+ row("EFG", null, null),
+ row("FGH", null, null),
+ row("GHI", null, null),
+ row("HIJ", null, null),
+ row("IJK", null, null),
+ row("JKL", null, null),
+ row("KLM", null, null)
+ )
+ )
+ }
+
+ @Test
+ def testRightOuterJoinReorder(): Unit = {
+ // This test is used to test the result after join to multi join and join reorder.
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+ Boolean.box(true))
+ // Register table with stats to support join reorder,
+ // join order RJ(J(RJ(t5, t3), T2), T1) will reorder to LJ(RJ(T5, J(T3, T2)), T1)
+ registerCollection(
+ "T5",
+ data5,
+ type5,
+ "d, e, f, g, h",
+ nullablesOfData5,
+ new FlinkStatistic(new TableStats(1000L)))
+ registerCollection(
+ "T3",
+ smallData3,
+ type3,
+ "a, b, c",
+ nullablesOfSmallData3,
+ new FlinkStatistic(new TableStats(100L)))
+ registerCollection(
+ "T2",
+ data2,
+ type2,
+ "d, e, f, g, h",
+ nullablesOfData2,
+ new FlinkStatistic(new TableStats(10L)))
+ registerCollection(
+ "T1",
+ data2,
+ type2,
+ "d, e, f, g, h",
+ nullablesOfData2,
+ new FlinkStatistic(new TableStats(100000L)))
+
+ checkResult(
+ """
+ |SELECT T5.g, T3b.c, T2b.g FROM T5 RIGHT OUTER JOIN
+ |(SELECT * FROM T3 WHERE T3.a > 0) T3b ON T3b.b = T5.e JOIN
+ |(SELECT * FROM T2 WHERE T2.d > 0) T2b ON T3b.b = T2b.e RIGHT OUTER JOIN
+ |(SELECT * FROM T1) T1b ON T3b.b = T1b.e
+ |""".stripMargin,
+ Seq(
+ row("Hallo Welt", "Hello world", "Hallo Welt"),
+ row("Hallo Welt", "Hello", "Hallo Welt"),
+ row("Hallo", "Hi", "Hallo"),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null),
+ row(null, null, null)
+ )
+ )
+ }
+
+ @Test
+ def testRightOuterJoinRightOuterJoinCannotReorder: Unit = {
+ // This test is used to test the result after join to multi join and join reorder.
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+ Boolean.box(true))
+ registerCollection("Table2", data2, type2, "d, e, f, g, h", nullablesOfData2)
+ // This query will be set into one multi jon set by FlinkJoinToMultiJoinRule,
+ // but it can not reorder, because the sub right outer join query join condition is from generate-null side.
+ checkResult(
+ """
+ |SELECT Table5.g, c, t.g FROM Table5 RIGHT OUTER JOIN
+ |(SELECT * FROM SmallTable3 RIGHT OUTER JOIN Table2 ON b = Table2.e) t ON t.e = Table5.e
+ |""".stripMargin,
+ Seq(
+ row("ABC", null, "ABC"),
+ row("BCD", null, "BCD"),
+ row("CDE", null, "CDE"),
+ row("DEF", null, "DEF"),
+ row("EFG", null, "EFG"),
+ row("FGH", null, "FGH"),
+ row("GHI", null, "GHI"),
+ row("HIJ", null, "HIJ"),
+ row("Hallo Welt wie gehts?", null, "Hallo Welt wie gehts?"),
+ row("Hallo Welt wie", null, "Hallo Welt wie"),
+ row("Hallo Welt", "Hello", "Hallo Welt"),
+ row("Hallo Welt", "Hello world", "Hallo Welt"),
+ row("Hallo", "Hi", "Hallo"),
+ row("IJK", null, "IJK"),
+ row("JKL", null, "JKL"),
+ row("KLM", null, "KLM")
+ )
+ )
+ }
+
+ @Test
+ def testInnerJoinReorder(): Unit = {
+ // This test is used to test the result after join to multi join and join reorder.
+ tEnv.getConfig.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED,
+ Boolean.box(true))
+ // Register table with stats to support join reorder,
+ // join order J(J(t5, t3), T2) will reorder to J(T5, J(T3, T2))
+ registerCollection(
+ "T5",
+ data5,
+ type5,
+ "d, e, f, g, h",
+ nullablesOfData5,
+ new FlinkStatistic(new TableStats(1000L)))
+ registerCollection(
+ "T3",
+ smallData3,
+ type3,
+ "a, b, c",
+ nullablesOfSmallData3,
+ new FlinkStatistic(new TableStats(100L)))
+ registerCollection(
+ "T2",
+ data2,
+ type2,
+ "d, e, f, g, h",
+ nullablesOfData2,
+ new FlinkStatistic(new TableStats(10L)))
+
+ checkResult(
+ """
+ |SELECT T5.g, c, T2.g FROM T5 JOIN T3 ON b = T5.e
+ |JOIN T2 ON b = T2.e WHERE T2.d > 0 AND T5.d > 0
+ |""".stripMargin,
+ Seq(
+ row("Hallo", "Hi", "Hallo"),
+ row("Hallo Welt", "Hello", "Hallo Welt"),
+ row("Hallo Welt", "Hello world", "Hallo Welt")
+ )
+ )
+ }
+
@Test
def testJoinWithAggregation(): Unit = {
checkResult("SELECT COUNT(g), COUNT(b) FROM SmallTable3, Table5 WHERE a = d", Seq(row(6L, 6L)))