You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2017/10/19 10:26:37 UTC
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
GitHub user fhueske opened a pull request:
https://github.com/apache/flink/pull/4858
[FLINK-7755] [table] Fix NULL handling in batch joins.
## What is the purpose of the change
This PR fixes a couple of issues with Table API / SQL batch joins:
- Proper support for joining null values for inner and outer joins
- Support for non-equi join predicates in outer joins (at least one equi-join predicate is required)
- Support for local predicates on the outer input of outer joins (at least one equi-join predicate is required)
## Brief change log
- Inner & Outer Joins: Evaluate all join predicates in a code-gen'd function (also equi-join predicates) for correct handling of three-value logic
- Outer joins: translate outer joins into a sequence of GroupReduce -> OuterJoin -> GroupReduce.
- The first GroupReduce groups on the full input row and deduplicates the outer side(s) of the join. A count for the number of deduplicated rows is kept.
- The OuterJoin evaluates the join predicate and computes possible join pairs of left and right rows. The non-outer element of the pair can be null if the join predicate does not match.
- The second GroupReduce groups again on the full input row and computes for each outer row the join result. If it was not match with any inner row, it produces a null-padded result.
- The plan for left and right outer joins requires only a single initial partitioning and sort of each input. The all operators can reuse the initial sort and produce a sorted result again. A full outer join requires an additional partitioning and sorting step.
- Checks for outer join translation are removed to allow outer joins with non-equi and local predicates.
## Verifying this change
- added ITCases for the new outer join features to `JoinITCase`
- added plan tests for Table API and SQL for the new outer join features
- updated validation tests
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **yes**
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
## Documentation
The documentation does not need to be adjusted because the outer join limitation were not documented.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/fhueske/flink tableBatchNullJoin
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4858.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4858
----
commit 1434d8d7debe207e4b8350199eded4e678885571
Author: Fabian Hueske <fh...@apache.org>
Date: 2017-10-15T15:55:23Z
[FLINK-7755] [table] Fix NULL handling in batch joins.
Fixes [FLINK-5498] (Add support for non-equi join and local predicates to outer joins) as well.
----
---
[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/4858
I'll merge the PR in the next days.
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4858
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4858#discussion_r145863086
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala ---
@@ -97,8 +97,7 @@ private class FlinkLogicalJoinConverter
private def hasEqualityPredicates(join: LogicalJoin, joinInfo: JoinInfo): Boolean = {
--- End diff --
The parameter `join` seems to be useless now.
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4858#discussion_r145938918
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala ---
@@ -156,65 +163,394 @@ class DataSetJoin(
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val (joinOperator, nullCheck) = joinType match {
- case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
- case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
- case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
- case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
+ joinType match {
+ case JoinRelType.INNER =>
+ addInnerJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.LEFT =>
+ addLeftOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.RIGHT =>
+ addRightOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.FULL =>
+ addFullOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
}
+ }
- if (nullCheck && !config.getNullCheck) {
- throw TableException("Null check in TableConfig must be enabled for outer joins.")
- }
+ private def addInnerJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
val generator = new FunctionCodeGenerator(
config,
- nullCheck,
- leftDataSet.getType,
- Some(rightDataSet.getType))
+ false,
+ left.getType,
+ Some(right.getType))
val conversion = generator.generateConverterResultExpression(
- returnType,
+ resultType,
joinRowType.getFieldNames)
- var body = ""
+ val condition = generator.generateExpression(joinCondition)
+ val body =
+ s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
- if (joinInfo.isEqui) {
- // only equality condition
- body = s"""
- |${conversion.code}
- |${generator.collectorTerm}.collect(${conversion.resultTerm});
- |""".stripMargin
- }
- else {
- val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder)
- val condition = generator.generateExpression(nonEquiPredicates)
- body = s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${generator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
- }
val genFunction = generator.generateFunction(
ruleDescription,
classOf[FlatJoinFunction[Row, Row, Row]],
body,
- returnType)
+ resultType)
val joinFun = new FlatJoinRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType)
- val joinOpName =
- s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
- s"join: (${joinSelectionToString(joinRowType)})"
+ left.join(right)
+ .where(leftKeys: _*)
+ .equalTo(rightKeys: _*)
+ .`with`(joinFun)
+ .name(getJoinOpName)
+ }
+
+ private def addLeftOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
+
+ // replace field names by indexed names for easier key handling
+ val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+ val rightType = right.getType.asInstanceOf[RowTypeInfo]
+
+ // partition and sort left input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy)
+ val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+
+ // deduplicate the rows of the left input
+ val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+ val joinFun = new LeftOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+ val joinPairs = deduplicatedRowsLeft.leftOuterJoin(right, JoinHint.REPARTITION_SORT_MERGE)
+ .where(nestedLeftKeys: _*)
+ .equalTo(rightKeys: _*)
+ .`with`(joinFun)
+ .withForwardedFieldsFirst("f0->f0")
+ .name(joinOpName)
+
+ // create GroupReduceFunction to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val reduceFun = new LeftOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // convert join pairs to result.
+ // This step ensures we preserve the rows of the left input.
+ joinPairs
+ .groupBy("f0")
+ .reduceGroup(reduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+ }
+
+ private def addRightOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
- joinOperator
- .where(leftKeys.toArray: _*)
- .equalTo(rightKeys.toArray: _*)
+ // replace field names by indexed names for easier key handling
+ val leftType = left.getType.asInstanceOf[RowTypeInfo]
+ val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+ // partition and sort right input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy)
+ val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+ // deduplicate the rows of the right input
+ val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+ val joinFun = new RightOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+ val joinPairs = left.rightOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+ .where(leftKeys: _*)
+ .equalTo(nestedRightKeys: _*)
.`with`(joinFun)
+ .withForwardedFieldsSecond("f0->f1")
+ .name(joinOpName)
+
+ // create GroupReduceFunction to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val reduceFun = new RightOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // convert join pairs to result
+ // This step ensures we preserve the rows of the right input.
+ joinPairs
+ .groupBy("f1")
+ .reduceGroup(reduceFun)
.name(joinOpName)
+ .returns(resultType)
}
+
+ private def addFullOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
+
+ // replace field names by indexed names for easier key handling
+ val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+ val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+ // partition and sort left and right input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy), except the second grouping to preserve right rows.
+ val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+ val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+ // deduplicate the rows of the left and right input
+ val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
+ val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT, Types.INT)
+ val joinFun = new FullOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+ val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+ val joinPairs = deduplicatedRowsLeft
+ .fullOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+ .where(nestedLeftKeys: _*)
+ .equalTo(nestedRightKeys: _*)
+ .`with`(joinFun)
+ .withForwardedFieldsFirst("f0->f0")
+ .withForwardedFieldsSecond("f0->f1")
+ .name(joinOpName)
+
+ // create GroupReduceFunctions to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val leftReduceFun = new LeftFullOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+ val rightReduceFun = new RightFullOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // compute joined (left + right) and left preserved (left + null)
+ val joinedAndLeftPreserved = joinPairs
+ // filter for pairs with left row
+ .filter(new FilterFunction[Row](){
+ override def filter(row: Row): Boolean = row.getField(0) != null})
+ .groupBy("f0")
+ .reduceGroup(leftReduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+
+ // compute right preserved (null + right)
+ val rightPreserved = joinPairs
+ // filter for pairs with right row
+ .filter(new FilterFunction[Row](){
+ override def filter(row: Row): Boolean = row.getField(1) != null})
+ .groupBy("f1")
+ .reduceGroup(rightReduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+
+ // union joined (left + right), left preserved (left + null), and right preserved (null + right)
+ joinedAndLeftPreserved.union(rightPreserved)
+ }
+
+ private def getJoinOpName: String = {
+ s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
+ s"join: (${joinSelectionToString(joinRowType)})"
+ }
+
+ /** Returns an array of indicies with some indicies being a prefix. */
+ private def getFullIndiciesWithPrefix(keys: Array[Int], numFields: Int): Array[Int] = {
+ // get indicies of all fields which are not keys
+ val nonKeys = (0 until numFields).filter(i => !keys.contains(i))
--- End diff --
Could be simplified with '_'.
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4858#discussion_r146076115
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala ---
@@ -41,8 +41,7 @@ class DataSetJoinRule
val joinInfo = join.analyzeCondition
// joins require an equi-condition or a conjunctive predicate with at least one equi-condition
- // and disable outer joins with non-equality predicates(see FLINK-5520)
- !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
+ !joinInfo.pairs().isEmpty
--- End diff --
Yes, that is true but the rule are also applied in different contexts. `FlinkLogicalJoin` is used for the initial translation of batch and stream programs and `DataSetJoinRule` only for batch. I think it's OK to have these checks as safety net.
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4858#discussion_r146591835
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
@@ -486,11 +486,6 @@ case class Join(
s"Invalid join condition: $expression. At least one equi-join predicate is " +
s"required.")
}
- if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
--- End diff --
Good point. Will remove that. Thanks @lincoln-lil!
---
[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/4858
I scanned through the code quickly and could not find any major issues. Well documented, well tested. +1 to merge this. Please don't fotget to update the docs.
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4858#discussion_r145863708
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala ---
@@ -41,8 +41,7 @@ class DataSetJoinRule
val joinInfo = join.analyzeCondition
// joins require an equi-condition or a conjunctive predicate with at least one equi-condition
- // and disable outer joins with non-equality predicates(see FLINK-5520)
- !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
+ !joinInfo.pairs().isEmpty
--- End diff --
The condition is checked three times in `FlinkLogicalJoin`, `DataSetJoinRule`, and `DataSetJoin`. It brings extra maintenance work every time we change the validation rule.
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:
https://github.com/apache/flink/pull/4858#discussion_r146504940
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
@@ -486,11 +486,6 @@ case class Join(
s"Invalid join condition: $expression. At least one equi-join predicate is " +
s"required.")
}
- if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
--- End diff --
Local variable `nonEquiJoinPredicateFound` and `localPredicateFound` can be removed here, and `checkIfFilterCondition` method is no longer needed
---
[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/4858
Thanks for the review @twalthr.
The docs don't need to be touched. The previous restrictions were not documented...
Will merge this PR.
---
[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/4858
Thanks for the review @xccui!
I've updated the PR.
Cheers, Fabian
---
[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...
Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4858#discussion_r145938669
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala ---
@@ -156,65 +163,394 @@ class DataSetJoin(
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val (joinOperator, nullCheck) = joinType match {
- case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
- case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
- case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
- case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
+ joinType match {
+ case JoinRelType.INNER =>
+ addInnerJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.LEFT =>
+ addLeftOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.RIGHT =>
+ addRightOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.FULL =>
+ addFullOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
}
+ }
- if (nullCheck && !config.getNullCheck) {
- throw TableException("Null check in TableConfig must be enabled for outer joins.")
- }
+ private def addInnerJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
val generator = new FunctionCodeGenerator(
config,
- nullCheck,
- leftDataSet.getType,
- Some(rightDataSet.getType))
+ false,
+ left.getType,
+ Some(right.getType))
val conversion = generator.generateConverterResultExpression(
- returnType,
+ resultType,
joinRowType.getFieldNames)
- var body = ""
+ val condition = generator.generateExpression(joinCondition)
+ val body =
+ s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
- if (joinInfo.isEqui) {
- // only equality condition
- body = s"""
- |${conversion.code}
- |${generator.collectorTerm}.collect(${conversion.resultTerm});
- |""".stripMargin
- }
- else {
- val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder)
- val condition = generator.generateExpression(nonEquiPredicates)
- body = s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${generator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
- }
val genFunction = generator.generateFunction(
ruleDescription,
classOf[FlatJoinFunction[Row, Row, Row]],
body,
- returnType)
+ resultType)
val joinFun = new FlatJoinRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType)
- val joinOpName =
- s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
- s"join: (${joinSelectionToString(joinRowType)})"
+ left.join(right)
+ .where(leftKeys: _*)
+ .equalTo(rightKeys: _*)
+ .`with`(joinFun)
+ .name(getJoinOpName)
+ }
+
+ private def addLeftOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
+
+ // replace field names by indexed names for easier key handling
+ val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+ val rightType = right.getType.asInstanceOf[RowTypeInfo]
+
+ // partition and sort left input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy)
+ val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+
+ // deduplicate the rows of the left input
+ val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+ val joinFun = new LeftOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+ val joinPairs = deduplicatedRowsLeft.leftOuterJoin(right, JoinHint.REPARTITION_SORT_MERGE)
+ .where(nestedLeftKeys: _*)
+ .equalTo(rightKeys: _*)
+ .`with`(joinFun)
+ .withForwardedFieldsFirst("f0->f0")
+ .name(joinOpName)
+
+ // create GroupReduceFunction to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val reduceFun = new LeftOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // convert join pairs to result.
+ // This step ensures we preserve the rows of the left input.
+ joinPairs
+ .groupBy("f0")
+ .reduceGroup(reduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+ }
+
+ private def addRightOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
- joinOperator
- .where(leftKeys.toArray: _*)
- .equalTo(rightKeys.toArray: _*)
+ // replace field names by indexed names for easier key handling
+ val leftType = left.getType.asInstanceOf[RowTypeInfo]
+ val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+ // partition and sort right input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy)
+ val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+ // deduplicate the rows of the right input
+ val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+ val joinFun = new RightOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+ val joinPairs = left.rightOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+ .where(leftKeys: _*)
+ .equalTo(nestedRightKeys: _*)
.`with`(joinFun)
+ .withForwardedFieldsSecond("f0->f1")
+ .name(joinOpName)
+
+ // create GroupReduceFunction to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val reduceFun = new RightOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // convert join pairs to result
+ // This step ensures we preserve the rows of the right input.
+ joinPairs
+ .groupBy("f1")
+ .reduceGroup(reduceFun)
.name(joinOpName)
+ .returns(resultType)
}
+
+ private def addFullOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
+
+ // replace field names by indexed names for easier key handling
+ val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+ val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+ // partition and sort left and right input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy), except the second grouping to preserve right rows.
+ val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+ val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+ // deduplicate the rows of the left and right input
+ val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
+ val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT, Types.INT)
+ val joinFun = new FullOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+ val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+ val joinPairs = deduplicatedRowsLeft
+ .fullOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+ .where(nestedLeftKeys: _*)
+ .equalTo(nestedRightKeys: _*)
+ .`with`(joinFun)
+ .withForwardedFieldsFirst("f0->f0")
+ .withForwardedFieldsSecond("f0->f1")
+ .name(joinOpName)
+
+ // create GroupReduceFunctions to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val leftReduceFun = new LeftFullOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+ val rightReduceFun = new RightFullOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // compute joined (left + right) and left preserved (left + null)
+ val joinedAndLeftPreserved = joinPairs
+ // filter for pairs with left row
+ .filter(new FilterFunction[Row](){
+ override def filter(row: Row): Boolean = row.getField(0) != null})
+ .groupBy("f0")
+ .reduceGroup(leftReduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+
+ // compute right preserved (null + right)
+ val rightPreserved = joinPairs
+ // filter for pairs with right row
+ .filter(new FilterFunction[Row](){
+ override def filter(row: Row): Boolean = row.getField(1) != null})
+ .groupBy("f1")
+ .reduceGroup(rightReduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+
+ // union joined (left + right), left preserved (left + null), and right preserved (null + right)
+ joinedAndLeftPreserved.union(rightPreserved)
+ }
+
+ private def getJoinOpName: String = {
+ s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
+ s"join: (${joinSelectionToString(joinRowType)})"
+ }
+
+ /** Returns an array of indicies with some indicies being a prefix. */
+ private def getFullIndiciesWithPrefix(keys: Array[Int], numFields: Int): Array[Int] = {
+ // get indicies of all fields which are not keys
+ val nonKeys = (0 until numFields).filter(i => !keys.contains(i))
+ // return all field indicies prefixed by keys
+ keys ++ nonKeys
+ }
+
+ /**
+ * Partitions the data set on the join keys and sort it on all field with the join keys being a
+ * prefix.
+ */
+ private def partitionAndSort(
+ dataSet: DataSet[Row],
+ partitionKeys: Array[Int]): DataSet[Row] = {
+
+ // construct full sort keys with partitionKeys being a prefix
+ val sortKeys = getFullIndiciesWithPrefix(partitionKeys, dataSet.getType.getArity)
+ // partition
+ val partitioned: DataSet[Row] = dataSet.partitionByHash(partitionKeys: _*)
+ // sort on all fields
+ sortKeys.foldLeft(partitioned: DataSet[Row]) { (d, i) =>
+ d.sortPartition(i, Order.ASCENDING).asInstanceOf[DataSet[Row]]
+ }
+ }
+
+ /**
+ * Deduplicates the rows of a data set and emits a row for each unique row with with the first
+ * field being the unique row and the second field being the number of duplicates of the row.
+ */
+ private def deduplicateRows(
--- End diff --
The function name is a little bit misleading. How about `foldIdenticalRows`?
---