You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/22 12:52:24 UTC

[GitHub] [flink] godfreyhe commented on a diff in pull request #21487: [FLINK-30270][table-planner] Fix FlinkJoinToMultiJoinRule incorrectly combines Left/Right outer join to MultiJoin error

godfreyhe commented on code in PR #21487:
URL: https://github.com/apache/flink/pull/21487#discussion_r1055192928


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java:
##########
@@ -178,6 +193,60 @@ public void onMatch(RelOptRuleCall call) {
         call.transformTo(multiJoin);
     }
 
+    private void buildInputNullGenFieldList(
+            RelNode left, RelNode right, JoinRelType joinType, List<Boolean> isNullGenFieldList) {
+        if (joinType == JoinRelType.INNER) {
+            buildNullGenFieldList(left, isNullGenFieldList);
+            buildNullGenFieldList(right, isNullGenFieldList);
+        } else if (joinType == JoinRelType.LEFT) {
+            // left judge.
+            buildNullGenFieldList(left, isNullGenFieldList);
+
+            for (int i = 0; i < right.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+        } else if (joinType == JoinRelType.RIGHT) {
+            for (int i = 0; i < left.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+
+            // right judge.
+            buildNullGenFieldList(right, isNullGenFieldList);
+        } else if (joinType == JoinRelType.FULL) {
+            for (int i = 0; i < left.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+            for (int i = 0; i < right.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+        }
+    }
+
+    private void buildNullGenFieldList(RelNode rel, List<Boolean> isNullGenFieldList) {
+        MultiJoin multiJoin = rel instanceof MultiJoin ? (MultiJoin) rel : null;
+        if (multiJoin == null) {
+            // other operator.
+            for (int i = 0; i < rel.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(false);
+            }
+        } else {
+            List<RelNode> inputs = multiJoin.getInputs();
+            List<JoinRelType> joinTypes = multiJoin.getJoinTypes();
+            for (int i = 0; i < inputs.size() - 1; i++) {
+                if (joinTypes.get(i) == JoinRelType.RIGHT) {

Review Comment:
   can you  explain why we should just handle RIGHT here?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java:
##########
@@ -378,25 +513,53 @@ private List<RexNode> combineJoinFilters(Join join, RelNode left, RelNode right)
      * @param nullGenerating true if the input is null generating
      * @return true if the input can be combined into a parent MultiJoin
      */
-    private boolean canCombine(RelNode input, JoinRelType joinType, boolean nullGenerating) {
+    private boolean canCombine(
+            RelNode input,
+            ImmutableIntList joinKeys,
+            JoinRelType joinType,
+            boolean nullGenerating,
+            boolean isLeft,
+            List<Boolean> inputNullGenFieldList,
+            int beginIndex) {
+        if (inputNullGenFieldList == null) {
+            // semi and anti join

Review Comment:
   inputNullGenFieldList is never null



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.xml:
##########
@@ -548,6 +549,46 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(c, d)]]])
    +- LogicalProject(e=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiLeftOuterJoinWithAllKeyInLeft">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM T1 LEFT OUTER JOIN T2 ON a = c LEFT OUTER JOIN 
+(SELECT * FROM T3) ON a = e LEFT OUTER JOIN
+(SELECT * FROM T4) ON a = g LEFT OUTER JOIN
+(SELECT * FROM T5) ON a = i

Review Comment:
   the query can be simplied as:
   
   SELECT * FROM T1 
   LEFT OUTER JOIN T2 ON a = c 
   LEFT OUTER JOIN T3 ON a = e
   LEFT OUTER JOIN T4 ON a = g 
   LEFT OUTER JOIN T5 ON a = i



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java:
##########
@@ -178,6 +193,60 @@ public void onMatch(RelOptRuleCall call) {
         call.transformTo(multiJoin);
     }
 
+    private void buildInputNullGenFieldList(
+            RelNode left, RelNode right, JoinRelType joinType, List<Boolean> isNullGenFieldList) {
+        if (joinType == JoinRelType.INNER) {
+            buildNullGenFieldList(left, isNullGenFieldList);
+            buildNullGenFieldList(right, isNullGenFieldList);
+        } else if (joinType == JoinRelType.LEFT) {
+            // left judge.
+            buildNullGenFieldList(left, isNullGenFieldList);
+
+            for (int i = 0; i < right.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+        } else if (joinType == JoinRelType.RIGHT) {
+            for (int i = 0; i < left.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+
+            // right judge.
+            buildNullGenFieldList(right, isNullGenFieldList);
+        } else if (joinType == JoinRelType.FULL) {
+            for (int i = 0; i < left.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+            for (int i = 0; i < right.getRowType().getFieldCount(); i++) {
+                isNullGenFieldList.add(true);
+            }
+        }

Review Comment:
   nit: it's better we could add some comments to explain what's the behavior is the `else` branch



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRuleTest.scala:
##########
@@ -268,4 +291,19 @@ class FlinkJoinToMultiJoinRuleTest extends TableTestBase {
         """.stripMargin
     util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testMultiLeftOuterJoinWithAllKeyInLeft: Unit = {

Review Comment:
   is any test with semi/anti to verify the change ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkJoinToMultiJoinRule.java:
##########
@@ -348,22 +456,49 @@ private void copyOuterJoinInfo(
      * @param right Right input of the join
      * @return combined join filters AND-ed together
      */
-    private List<RexNode> combineJoinFilters(Join join, RelNode left, RelNode right) {
+    private List<RexNode> combineJoinFilters(
+            Join join, RelNode left, RelNode right, List<Boolean> inputNullGenFieldList) {
         JoinRelType joinType = join.getJoinType();
+        JoinInfo joinInfo = join.analyzeCondition();
+        ImmutableIntList leftKeys = joinInfo.leftKeys;
+        ImmutableIntList rightKeys = joinInfo.rightKeys;
 
         // AND the join condition if this isn't a left or right outer join; In those cases, the
         // outer join condition is already tracked separately.
         final List<RexNode> filters = new ArrayList<>();
         if ((joinType != JoinRelType.LEFT) && (joinType != JoinRelType.RIGHT)) {
             filters.add(join.getCondition());
         }
-        if (canCombine(left, joinType, joinType.generatesNullsOnLeft())) {
-            filters.add(((MultiJoin) left).getJoinFilter());
+        if (canCombine(
+                left,
+                leftKeys,
+                joinType,
+                joinType.generatesNullsOnLeft(),
+                true,
+                inputNullGenFieldList,
+                0)) {
+            final MultiJoin multiJoin =
+                    left instanceof Project

Review Comment:
   can we handle the Projection now ? does any tests cover the branch ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org