You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/16 13:07:35 UTC
[flink] branch master updated: [FLINK-26505][hive] Support non equality condition for left semi join in Hive dialect. (#18994)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 479ddeaa02a [FLINK-26505][hive] Support non equality condition for left semi join in Hive dialect. (#18994)
479ddeaa02a is described below
commit 479ddeaa02a5c571b252612205a92e3265c2435e
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Aug 16 21:07:26 2022 +0800
[FLINK-26505][hive] Support non equality condition for left semi join in Hive dialect. (#18994)
---
.../delegation/hive/HiveParserCalcitePlanner.java | 33 ++++++++++++++--------
.../src/test/resources/query-test/join.q | 6 ++++
2 files changed, 27 insertions(+), 12 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index 8b657cab3ce..e1e5a452d93 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -57,7 +57,6 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollation;
@@ -113,6 +112,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOptUtil;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.JoinType;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -570,25 +570,18 @@ public class HiveParserCalcitePlanner {
List<RexNode> rightJoinKeys = new ArrayList<>();
RexNode nonEquiConds =
- RelOptUtil.splitJoinCondition(
+ HiveRelOptUtil.splitHiveJoinCondition(
sysFieldList,
- leftRel,
- rightRel,
+ Arrays.asList(leftRel, rightRel),
joinCondRex,
- leftJoinKeys,
- rightJoinKeys,
+ Arrays.asList(leftJoinKeys, rightJoinKeys),
null,
null);
- if (!nonEquiConds.isAlwaysTrue()) {
- throw new SemanticException(
- "Non equality condition not supported in Semi-Join" + nonEquiConds);
- }
-
RelNode[] inputRels = new RelNode[] {leftRel, rightRel};
final List<Integer> leftKeys = new ArrayList<>();
final List<Integer> rightKeys = new ArrayList<>();
- joinCondRex =
+ RexNode remainingEquiCond =
HiveParserUtils.projectNonColumnEquiConditions(
RelFactories.DEFAULT_PROJECT_FACTORY,
inputRels,
@@ -597,6 +590,22 @@ public class HiveParserCalcitePlanner {
0,
leftKeys,
rightKeys);
+ // Adjust right input fields in nonEquiConds if previous call modified the input
+ if (inputRels[0] != leftRel) {
+ nonEquiConds =
+ RexUtil.shift(
+ nonEquiConds,
+ leftRel.getRowType().getFieldCount(),
+ inputRels[0].getRowType().getFieldCount()
+ - leftRel.getRowType().getFieldCount());
+ }
+ joinCondRex =
+ remainingEquiCond != null
+ ? RexUtil.composeConjunction(
+ cluster.getRexBuilder(),
+ Arrays.asList(remainingEquiCond, nonEquiConds),
+ false)
+ : nonEquiConds;
topRel =
LogicalJoin.create(
inputRels[0],
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q b/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q
index 8712ed6ad52..dc2d8f6f344 100644
--- a/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q
+++ b/flink-connectors/flink-connector-hive/src/test/resources/query-test/join.q
@@ -24,6 +24,12 @@ select * from foo left semi join bar on foo.y=bar.i;
[+I[1, 1], +I[2, 2]]
+select count(1) from (select x from foo where x = 1) foo1 left semi join (select i from bar where i = 1) bar2 on 1 = 1;
+[+I[1]]
+
+select * from foo left semi join bar on (foo.x + bar.i > 4);
+[+I[3, 3], +I[4, 4], +I[5, 5]]
+
select * from (select a.value, a.* from (select * from src) a join (select * from src) b on a.key = b.key) t;
[+I[val1, 1, val1], +I[val2, 2, val2], +I[val3, 3, val3]]