You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2017/10/19 10:26:37 UTC

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

GitHub user fhueske opened a pull request:

    https://github.com/apache/flink/pull/4858

    [FLINK-7755] [table] Fix NULL handling in batch joins.

    ## What is the purpose of the change
    
    This PR fixes a couple of issues with Table API / SQL batch joins:
    - Proper support for joining null values for inner and outer joins
    - Support for non-equi join predicates in outer joins (at least one equi-join predicate is required)
    - Support for local predicates on the outer input of outer joins (at least one equi-join predicate is required)
    
    ## Brief change log
    
    - Inner & Outer Joins: Evaluate all join predicates in a code-gen'd function (also equi-join predicates) for correct handling of three-value logic
    - Outer joins: translate outer joins into a sequence of GroupReduce -> OuterJoin -> GroupReduce. 
      - The first GroupReduce groups on the full input row and deduplicates the outer side(s) of the join. A count for the number of deduplicated rows is kept.
      - The OuterJoin evaluates the join predicate and computes possible join pairs of left and right rows. The non-outer element of the pair can be null if the join predicate does not match.
      - The second GroupReduce groups again on the full input row and computes for each outer row the join result. If it was not match with any inner row, it produces a null-padded result.
      - The plan for left and right outer joins requires only a single initial partitioning and sort of each input. The all operators can reuse the initial sort and produce a sorted result again. A full outer join requires an additional partitioning and sorting step.
    - Checks for outer join translation are removed to allow outer joins with non-equi and local predicates.
    
    ## Verifying this change
    
    - added ITCases for the new outer join features to `JoinITCase`
    - added plan tests for Table API and SQL for the new outer join features
    - updated validation tests
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **yes**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
    
    ## Documentation
    
    The documentation does not need to be adjusted because the outer join limitation were not documented. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhueske/flink tableBatchNullJoin

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4858.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4858
    
----
commit 1434d8d7debe207e4b8350199eded4e678885571
Author: Fabian Hueske <fh...@apache.org>
Date:   2017-10-15T15:55:23Z

    [FLINK-7755] [table] Fix NULL handling in batch joins.
    
    Fixes [FLINK-5498] (Add support for non-equi join and local predicates to outer joins) as well.

----


---

[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4858
  
    I'll merge the PR in the next days.


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4858


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r145863086
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala ---
    @@ -97,8 +97,7 @@ private class FlinkLogicalJoinConverter
     
       private def hasEqualityPredicates(join: LogicalJoin, joinInfo: JoinInfo): Boolean = {
    --- End diff --
    
    The parameter `join` seems to be useless now.


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r145938918
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala ---
    @@ -156,65 +163,394 @@ class DataSetJoin(
         val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
         val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
    -    val (joinOperator, nullCheck) = joinType match {
    -      case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
    -      case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
    -      case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
    -      case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
    +    joinType match {
    +      case JoinRelType.INNER =>
    +        addInnerJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.LEFT =>
    +        addLeftOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.RIGHT =>
    +        addRightOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.FULL =>
    +        addFullOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
         }
    +  }
     
    -    if (nullCheck && !config.getNullCheck) {
    -      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    -    }
    +  private def addInnerJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
     
         val generator = new FunctionCodeGenerator(
           config,
    -      nullCheck,
    -      leftDataSet.getType,
    -      Some(rightDataSet.getType))
    +      false,
    +      left.getType,
    +      Some(right.getType))
         val conversion = generator.generateConverterResultExpression(
    -      returnType,
    +      resultType,
           joinRowType.getFieldNames)
     
    -    var body = ""
    +    val condition = generator.generateExpression(joinCondition)
    +    val body =
    +      s"""
    +         |${condition.code}
    +         |if (${condition.resultTerm}) {
    +         |  ${conversion.code}
    +         |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
    +         |}
    +         |""".stripMargin
     
    -    if (joinInfo.isEqui) {
    -      // only equality condition
    -      body = s"""
    -           |${conversion.code}
    -           |${generator.collectorTerm}.collect(${conversion.resultTerm});
    -           |""".stripMargin
    -    }
    -    else {
    -      val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder)
    -      val condition = generator.generateExpression(nonEquiPredicates)
    -      body = s"""
    -           |${condition.code}
    -           |if (${condition.resultTerm}) {
    -           |  ${conversion.code}
    -           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
    -           |}
    -           |""".stripMargin
    -    }
         val genFunction = generator.generateFunction(
           ruleDescription,
           classOf[FlatJoinFunction[Row, Row, Row]],
           body,
    -      returnType)
    +      resultType)
     
         val joinFun = new FlatJoinRunner[Row, Row, Row](
           genFunction.name,
           genFunction.code,
           genFunction.returnType)
     
    -    val joinOpName =
    -      s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
    -        s"join: (${joinSelectionToString(joinRowType)})"
    +    left.join(right)
    +      .where(leftKeys: _*)
    +      .equalTo(rightKeys: _*)
    +      .`with`(joinFun)
    +      .name(getJoinOpName)
    +  }
    +
    +  private def addLeftOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
    +
    +    // replace field names by indexed names for easier key handling
    +    val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +    val rightType = right.getType.asInstanceOf[RowTypeInfo]
    +
    +    // partition and sort left input
    +    // this step ensures we can reuse the sorting for all following operations
    +    // (groupBy->join->groupBy)
    +    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
    +
    +    // deduplicate the rows of the left input
    +    val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
    +    val joinFun = new LeftOuterJoinRunner(predFun.name, predFun.code, joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join pairs
    +    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
    +    val joinPairs = deduplicatedRowsLeft.leftOuterJoin(right, JoinHint.REPARTITION_SORT_MERGE)
    +      .where(nestedLeftKeys: _*)
    +      .equalTo(rightKeys: _*)
    +      .`with`(joinFun)
    +      .withForwardedFieldsFirst("f0->f0")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunction to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
    +    val reduceFun = new LeftOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // convert join pairs to result.
    +    // This step ensures we preserve the rows of the left input.
    +    joinPairs
    +      .groupBy("f0")
    +      .reduceGroup(reduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +  }
    +
    +  private def addRightOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
     
    -    joinOperator
    -      .where(leftKeys.toArray: _*)
    -      .equalTo(rightKeys.toArray: _*)
    +    // replace field names by indexed names for easier key handling
    +    val leftType = left.getType.asInstanceOf[RowTypeInfo]
    +    val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +
    +    // partition and sort right input
    +    // this step ensures we can reuse the sorting for all following operations
    +    // (groupBy->join->groupBy)
    +    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
    +
    +    // deduplicate the rows of the right input
    +    val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
    +    val joinFun = new RightOuterJoinRunner(predFun.name, predFun.code, joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join pairs
    +    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
    +    val joinPairs = left.rightOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
    +      .where(leftKeys: _*)
    +      .equalTo(nestedRightKeys: _*)
           .`with`(joinFun)
    +      .withForwardedFieldsSecond("f0->f1")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunction to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
    +    val reduceFun = new RightOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // convert join pairs to result
    +    // This step ensures we preserve the rows of the right input.
    +    joinPairs
    +      .groupBy("f1")
    +      .reduceGroup(reduceFun)
           .name(joinOpName)
    +      .returns(resultType)
       }
    +
    +  private def addFullOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
    +
    +    // replace field names by indexed names for easier key handling
    +    val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +    val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +
    +    // partition and sort left and right input
    +    // this step ensures we can reuse the sorting for all following operations
    +    // (groupBy->join->groupBy), except the second grouping to preserve right rows.
    +    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
    +    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
    +
    +    // deduplicate the rows of the left and right input
    +    val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
    +    val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT, Types.INT)
    +    val joinFun = new FullOuterJoinRunner(predFun.name, predFun.code, joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join pairs
    +    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
    +    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
    +    val joinPairs = deduplicatedRowsLeft
    +      .fullOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
    +      .where(nestedLeftKeys: _*)
    +      .equalTo(nestedRightKeys: _*)
    +      .`with`(joinFun)
    +      .withForwardedFieldsFirst("f0->f0")
    +      .withForwardedFieldsSecond("f0->f1")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunctions to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
    +    val leftReduceFun = new LeftFullOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +    val rightReduceFun = new RightFullOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // compute joined (left + right) and left preserved (left + null)
    +    val joinedAndLeftPreserved = joinPairs
    +      // filter for pairs with left row
    +      .filter(new FilterFunction[Row](){
    +        override def filter(row: Row): Boolean = row.getField(0) != null})
    +      .groupBy("f0")
    +      .reduceGroup(leftReduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +
    +    // compute right preserved (null + right)
    +    val rightPreserved = joinPairs
    +      // filter for pairs with right row
    +      .filter(new FilterFunction[Row](){
    +        override def filter(row: Row): Boolean = row.getField(1) != null})
    +      .groupBy("f1")
    +      .reduceGroup(rightReduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +
    +    // union joined (left + right), left preserved (left + null), and right preserved (null + right)
    +    joinedAndLeftPreserved.union(rightPreserved)
    +  }
    +
    +  private def getJoinOpName: String = {
    +    s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
    +      s"join: (${joinSelectionToString(joinRowType)})"
    +  }
    +
    +  /** Returns an array of indicies with some indicies being a prefix. */
    +  private def getFullIndiciesWithPrefix(keys: Array[Int], numFields: Int): Array[Int] = {
    +    // get indicies of all fields which are not keys
    +    val nonKeys = (0 until numFields).filter(i => !keys.contains(i))
    --- End diff --
    
    Could be simplified with '_'.


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r146076115
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala ---
    @@ -41,8 +41,7 @@ class DataSetJoinRule
         val joinInfo = join.analyzeCondition
     
         // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
    -    // and disable outer joins with non-equality predicates(see FLINK-5520)
    -    !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
    +    !joinInfo.pairs().isEmpty
    --- End diff --
    
    Yes, that is true but the rule are also applied in different contexts. `FlinkLogicalJoin` is used for the initial translation of batch and stream programs and `DataSetJoinRule` only for batch. I think it's OK to have these checks as safety net.


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r146591835
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -486,11 +486,6 @@ case class Join(
             s"Invalid join condition: $expression. At least one equi-join predicate is " +
               s"required.")
         }
    -    if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
    --- End diff --
    
    Good point. Will remove that. Thanks @lincoln-lil!


---

[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4858
  
    I scanned through the code quickly and could not find any major issues. Well documented, well tested. +1 to merge this. Please don't fotget to update the docs.


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r145863708
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala ---
    @@ -41,8 +41,7 @@ class DataSetJoinRule
         val joinInfo = join.analyzeCondition
     
         // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
    -    // and disable outer joins with non-equality predicates(see FLINK-5520)
    -    !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
    +    !joinInfo.pairs().isEmpty
    --- End diff --
    
    The condition is checked three times in `FlinkLogicalJoin`, `DataSetJoinRule`, and `DataSetJoin`. It brings extra maintenance work every time we change the validation rule. 


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r146504940
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -486,11 +486,6 @@ case class Join(
             s"Invalid join condition: $expression. At least one equi-join predicate is " +
               s"required.")
         }
    -    if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
    --- End diff --
    
    Local variable `nonEquiJoinPredicateFound` and `localPredicateFound` can be removed here,  and  `checkIfFilterCondition` method is no longer needed


---

[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4858
  
    Thanks for the review @twalthr. 
    The docs don't need to be touched. The previous restrictions were not documented...
    
    Will merge this PR.


---

[GitHub] flink issue #4858: [FLINK-7755] [table] Fix NULL handling in batch joins.

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4858
  
    Thanks for the review @xccui! 
    I've updated the PR.
    Cheers, Fabian


---

[GitHub] flink pull request #4858: [FLINK-7755] [table] Fix NULL handling in batch jo...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r145938669
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala ---
    @@ -156,65 +163,394 @@ class DataSetJoin(
         val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
         val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
    -    val (joinOperator, nullCheck) = joinType match {
    -      case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
    -      case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
    -      case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
    -      case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
    +    joinType match {
    +      case JoinRelType.INNER =>
    +        addInnerJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.LEFT =>
    +        addLeftOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.RIGHT =>
    +        addRightOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.FULL =>
    +        addFullOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
         }
    +  }
     
    -    if (nullCheck && !config.getNullCheck) {
    -      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    -    }
    +  private def addInnerJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
     
         val generator = new FunctionCodeGenerator(
           config,
    -      nullCheck,
    -      leftDataSet.getType,
    -      Some(rightDataSet.getType))
    +      false,
    +      left.getType,
    +      Some(right.getType))
         val conversion = generator.generateConverterResultExpression(
    -      returnType,
    +      resultType,
           joinRowType.getFieldNames)
     
    -    var body = ""
    +    val condition = generator.generateExpression(joinCondition)
    +    val body =
    +      s"""
    +         |${condition.code}
    +         |if (${condition.resultTerm}) {
    +         |  ${conversion.code}
    +         |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
    +         |}
    +         |""".stripMargin
     
    -    if (joinInfo.isEqui) {
    -      // only equality condition
    -      body = s"""
    -           |${conversion.code}
    -           |${generator.collectorTerm}.collect(${conversion.resultTerm});
    -           |""".stripMargin
    -    }
    -    else {
    -      val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder)
    -      val condition = generator.generateExpression(nonEquiPredicates)
    -      body = s"""
    -           |${condition.code}
    -           |if (${condition.resultTerm}) {
    -           |  ${conversion.code}
    -           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
    -           |}
    -           |""".stripMargin
    -    }
         val genFunction = generator.generateFunction(
           ruleDescription,
           classOf[FlatJoinFunction[Row, Row, Row]],
           body,
    -      returnType)
    +      resultType)
     
         val joinFun = new FlatJoinRunner[Row, Row, Row](
           genFunction.name,
           genFunction.code,
           genFunction.returnType)
     
    -    val joinOpName =
    -      s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
    -        s"join: (${joinSelectionToString(joinRowType)})"
    +    left.join(right)
    +      .where(leftKeys: _*)
    +      .equalTo(rightKeys: _*)
    +      .`with`(joinFun)
    +      .name(getJoinOpName)
    +  }
    +
    +  private def addLeftOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
    +
    +    // replace field names by indexed names for easier key handling
    +    val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +    val rightType = right.getType.asInstanceOf[RowTypeInfo]
    +
    +    // partition and sort left input
    +    // this step ensures we can reuse the sorting for all following operations
    +    // (groupBy->join->groupBy)
    +    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
    +
    +    // deduplicate the rows of the left input
    +    val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
    +    val joinFun = new LeftOuterJoinRunner(predFun.name, predFun.code, joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join pairs
    +    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
    +    val joinPairs = deduplicatedRowsLeft.leftOuterJoin(right, JoinHint.REPARTITION_SORT_MERGE)
    +      .where(nestedLeftKeys: _*)
    +      .equalTo(rightKeys: _*)
    +      .`with`(joinFun)
    +      .withForwardedFieldsFirst("f0->f0")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunction to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
    +    val reduceFun = new LeftOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // convert join pairs to result.
    +    // This step ensures we preserve the rows of the left input.
    +    joinPairs
    +      .groupBy("f0")
    +      .reduceGroup(reduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +  }
    +
    +  private def addRightOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
     
    -    joinOperator
    -      .where(leftKeys.toArray: _*)
    -      .equalTo(rightKeys.toArray: _*)
    +    // replace field names by indexed names for easier key handling
    +    val leftType = left.getType.asInstanceOf[RowTypeInfo]
    +    val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +
    +    // partition and sort right input
    +    // this step ensures we can reuse the sorting for all following operations
    +    // (groupBy->join->groupBy)
    +    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
    +
    +    // deduplicate the rows of the right input
    +    val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
    +    val joinFun = new RightOuterJoinRunner(predFun.name, predFun.code, joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join pairs
    +    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
    +    val joinPairs = left.rightOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
    +      .where(leftKeys: _*)
    +      .equalTo(nestedRightKeys: _*)
           .`with`(joinFun)
    +      .withForwardedFieldsSecond("f0->f1")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunction to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
    +    val reduceFun = new RightOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // convert join pairs to result
    +    // This step ensures we preserve the rows of the right input.
    +    joinPairs
    +      .groupBy("f1")
    +      .reduceGroup(reduceFun)
           .name(joinOpName)
    +      .returns(resultType)
       }
    +
    +  private def addFullOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
    +
    +    // replace field names by indexed names for easier key handling
    +    val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +    val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +
    +    // partition and sort left and right input
    +    // this step ensures we can reuse the sorting for all following operations
    +    // (groupBy->join->groupBy), except the second grouping to preserve right rows.
    +    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
    +    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
    +
    +    // deduplicate the rows of the left and right input
    +    val deduplicatedRowsLeft: DataSet[Row] = deduplicateRows(partitionedSortedLeft, leftType)
    +    val deduplicatedRowsRight: DataSet[Row] = deduplicateRows(partitionedSortedRight, rightType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT, Types.INT)
    +    val joinFun = new FullOuterJoinRunner(predFun.name, predFun.code, joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join pairs
    +    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
    +    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
    +    val joinPairs = deduplicatedRowsLeft
    +      .fullOuterJoin(deduplicatedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
    +      .where(nestedLeftKeys: _*)
    +      .equalTo(nestedRightKeys: _*)
    +      .`with`(joinFun)
    +      .withForwardedFieldsFirst("f0->f0")
    +      .withForwardedFieldsSecond("f0->f1")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunctions to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
    +    val leftReduceFun = new LeftFullOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +    val rightReduceFun = new RightFullOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // compute joined (left + right) and left preserved (left + null)
    +    val joinedAndLeftPreserved = joinPairs
    +      // filter for pairs with left row
    +      .filter(new FilterFunction[Row](){
    +        override def filter(row: Row): Boolean = row.getField(0) != null})
    +      .groupBy("f0")
    +      .reduceGroup(leftReduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +
    +    // compute right preserved (null + right)
    +    val rightPreserved = joinPairs
    +      // filter for pairs with right row
    +      .filter(new FilterFunction[Row](){
    +        override def filter(row: Row): Boolean = row.getField(1) != null})
    +      .groupBy("f1")
    +      .reduceGroup(rightReduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +
    +    // union joined (left + right), left preserved (left + null), and right preserved (null + right)
    +    joinedAndLeftPreserved.union(rightPreserved)
    +  }
    +
    +  private def getJoinOpName: String = {
    +    s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
    +      s"join: (${joinSelectionToString(joinRowType)})"
    +  }
    +
    +  /** Returns an array of indicies with some indicies being a prefix. */
    +  private def getFullIndiciesWithPrefix(keys: Array[Int], numFields: Int): Array[Int] = {
    +    // get indicies of all fields which are not keys
    +    val nonKeys = (0 until numFields).filter(i => !keys.contains(i))
    +    // return all field indicies prefixed by keys
    +    keys ++ nonKeys
    +  }
    +
    +  /**
    +    * Partitions the data set on the join keys and sort it on all field with the join keys being a
    +    * prefix.
    +    */
    +  private def partitionAndSort(
    +      dataSet: DataSet[Row],
    +      partitionKeys: Array[Int]): DataSet[Row] = {
    +
    +    // construct full sort keys with partitionKeys being a prefix
    +    val sortKeys = getFullIndiciesWithPrefix(partitionKeys, dataSet.getType.getArity)
    +    // partition
    +    val partitioned: DataSet[Row] = dataSet.partitionByHash(partitionKeys: _*)
    +    // sort on all fields
    +    sortKeys.foldLeft(partitioned: DataSet[Row]) { (d, i) =>
    +      d.sortPartition(i, Order.ASCENDING).asInstanceOf[DataSet[Row]]
    +    }
    +  }
    +
    +  /**
    +    * Deduplicates the rows of a data set and emits a row for each unique row with with the first
    +    * field being the unique row and the second field being the number of duplicates of the row.
    +    */
    +  private def deduplicateRows(
    --- End diff --
    
    The function name is a little bit misleading. How about `foldIdenticalRows`?


---