You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/16 13:07:35 UTC

[flink] branch master updated: [FLINK-26505][hive] Support non equality condition for left semi join in Hive dialect. (#18994)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 479ddeaa02a [FLINK-26505][hive] Support non equality condition for left semi join in Hive dialect. (#18994)
479ddeaa02a is described below

commit 479ddeaa02a5c571b252612205a92e3265c2435e
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Aug 16 21:07:26 2022 +0800

    [FLINK-26505][hive] Support non equality condition for left semi join in Hive dialect. (#18994)
---
 .../delegation/hive/HiveParserCalcitePlanner.java  | 33 ++++++++++++++--------
 .../src/test/resources/query-test/join.q           |  6 ++++
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index 8b657cab3ce..e1e5a452d93 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -57,7 +57,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.plan.ViewExpanders;
 import org.apache.calcite.rel.RelCollation;
@@ -113,6 +112,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOptUtil;
 import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
 import org.apache.hadoop.hive.ql.parse.JoinType;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -570,25 +570,18 @@ public class HiveParserCalcitePlanner {
             List<RexNode> rightJoinKeys = new ArrayList<>();
 
             RexNode nonEquiConds =
-                    RelOptUtil.splitJoinCondition(
+                    HiveRelOptUtil.splitHiveJoinCondition(
                             sysFieldList,
-                            leftRel,
-                            rightRel,
+                            Arrays.asList(leftRel, rightRel),
                             joinCondRex,
-                            leftJoinKeys,
-                            rightJoinKeys,
+                            Arrays.asList(leftJoinKeys, rightJoinKeys),
                             null,
                             null);
 
-            if (!nonEquiConds.isAlwaysTrue()) {
-                throw new SemanticException(
-                        "Non equality condition not supported in Semi-Join" + nonEquiConds);
-            }
-
             RelNode[] inputRels = new RelNode[] {leftRel, rightRel};
             final List<Integer> leftKeys = new ArrayList<>();
             final List<Integer> rightKeys = new ArrayList<>();
-            joinCondRex =
+            RexNode remainingEquiCond =
                     HiveParserUtils.projectNonColumnEquiConditions(
                             RelFactories.DEFAULT_PROJECT_FACTORY,
                             inputRels,
@@ -597,6 +590,22 @@ public class HiveParserCalcitePlanner {
                             0,
                             leftKeys,
                             rightKeys);
+            // Adjust right input fields in nonEquiConds if previous call modified the input
+            if (inputRels[0] != leftRel) {
+                nonEquiConds =
+                        RexUtil.shift(
+                                nonEquiConds,
+                                leftRel.getRowType().getFieldCount(),
+                                inputRels[0].getRowType().getFieldCount()
+                                        - leftRel.getRowType().getFieldCount());
+            }
+            joinCondRex =
+                    remainingEquiCond != null
+                            ? RexUtil.composeConjunction(
+                                    cluster.getRexBuilder(),
+                                    Arrays.asList(remainingEquiCond, nonEquiConds),
+                                    false)
+                            : nonEquiConds;
             topRel =
                     LogicalJoin.create(
                             inputRels[0],
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q b/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q
index 8712ed6ad52..dc2d8f6f344 100644
--- a/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q
+++ b/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q
@@ -24,6 +24,12 @@ select * from foo left semi join bar on foo.y=bar.i;
 
 [+I[1, 1], +I[2, 2]]
 
+select count(1) from (select x from foo where x = 1) foo1 left semi join (select i from bar where i = 1) bar2 on 1 = 1;
+[+I[1]]
+
+select * from foo left semi join bar on (foo.x + bar.i > 4);
+[+I[3, 3], +I[4, 4], +I[5, 5]]
+
 select * from (select a.value, a.* from (select * from src) a join (select * from src) b on a.key = b.key) t;
 
 [+I[val1, 1, val1], +I[val2, 2, val2], +I[val3, 3, val3]]