You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/24 18:29:04 UTC

[1/3] flink git commit: [FLINK-7755] [table] Fix NULL handling in batch joins.

Repository: flink
Updated Branches:
  refs/heads/master 512b9c478 -> e1c7e4903


[FLINK-7755] [table] Fix NULL handling in batch joins.

- Fixes also [FLINK-5498] Add support for non-equi join and local predicates to outer joins

This closes #4858.
This closes #3379 (stale PR for FLINK-5498).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a638155
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a638155
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a638155

Branch: refs/heads/master
Commit: 2a6381553e48e0145655c0aadd173813de610aa7
Parents: 512b9c4
Author: Fabian Hueske <fh...@apache.org>
Authored: Sun Oct 15 17:55:23 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 24 20:28:11 2017 +0200

----------------------------------------------------------------------
 .../table/codegen/FunctionCodeGenerator.scala   |  12 +
 .../table/codegen/calls/ScalarOperators.scala   |  38 +-
 .../flink/table/plan/logical/operators.scala    |  15 -
 .../table/plan/nodes/dataset/DataSetJoin.scala  | 419 +++++++++++++++++--
 .../plan/nodes/logical/FlinkLogicalJoin.scala   |   7 +-
 .../plan/rules/dataSet/DataSetJoinRule.scala    |   3 +-
 .../runtime/outerJoinGroupReduceRunners.scala   | 244 +++++++++++
 .../flink/table/runtime/outerJoinRunners.scala  | 195 +++++++++
 .../flink/table/api/batch/sql/JoinTest.scala    | 314 ++++++++++++++
 .../sql/validation/JoinValidationTest.scala     |  45 +-
 .../flink/table/api/batch/table/JoinTest.scala  | 304 ++++++++++++++
 .../table/validation/JoinValidationTest.scala   |  55 +--
 .../table/runtime/batch/table/JoinITCase.scala  | 240 ++++++++++-
 13 files changed, 1702 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
index e86c4ab..2bd2fe7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
@@ -125,6 +125,18 @@ class FunctionCodeGenerator(
              s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
     }
 
+    // JoinFunction
+    else if (clazz == classOf[JoinFunction[_, _, _]]) {
+      val baseClass = classOf[RichJoinFunction[_, _, _]]
+      val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
+      val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
+        throw new CodeGenException("Input 2 for JoinFunction should not be null")))
+      (baseClass,
+        s"Object join(Object _in1, Object _in2)",
+        List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
+          s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
+    }
+
     // ProcessFunction
     else if (clazz == classOf[ProcessFunction[_, _]]) {
       val baseClass = classOf[ProcessFunction[_, _]]

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 7de7aca..bd5b1f7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -382,33 +382,23 @@ object ScalarOperators {
          |boolean $resultTerm = false;
          |boolean $nullTerm = false;
          |if (!${left.nullTerm} && !${left.resultTerm}) {
-         |  // left expr is false, skip right expr
+         |  // left expr is false, result is always false
+         |  // skip right expr
          |} else {
          |  ${right.code}
          |
-         |  if (!${left.nullTerm} && !${right.nullTerm}) {
-         |    $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-         |    $nullTerm = false;
-         |  }
-         |  else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
-         |    $resultTerm = false;
-         |    $nullTerm = true;
-         |  }
-         |  else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
-         |    $resultTerm = false;
-         |    $nullTerm = false;
-         |  }
-         |  else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
-         |    $resultTerm = false;
-         |    $nullTerm = true;
-         |  }
-         |  else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
-         |    $resultTerm = false;
-         |    $nullTerm = false;
-         |  }
-         |  else {
-         |    $resultTerm = false;
-         |    $nullTerm = true;
+         |  if (${left.nullTerm}) {
+         |    // left is null (unknown)
+         |    if (${right.nullTerm} || ${right.resultTerm}) {
+         |      $nullTerm = true;
+         |    }
+         |  } else {
+         |    // left is true
+         |    if (${right.nullTerm}) {
+         |      $nullTerm = true;
+         |    } else if (${right.resultTerm}) {
+         |      $resultTerm = true;
+         |    }
          |  }
          |}
        """.stripMargin

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index e723eef..fe2bfe5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -457,8 +457,6 @@ case class Join(
     }
 
     var equiJoinPredicateFound = false
-    var nonEquiJoinPredicateFound = false
-    var localPredicateFound = false
     // Whether the predicate is literal true.
     val alwaysTrue = expression match {
       case x: Literal if x.value.equals(true) => true
@@ -472,15 +470,7 @@ case class Join(
         if (isAndBranch && checkIfJoinCondition(x)) {
           equiJoinPredicateFound = true
         }
-        if (checkIfFilterCondition(x)) {
-          localPredicateFound = true
-        }
       case x: BinaryComparison =>
-        if (checkIfFilterCondition(x)) {
-          localPredicateFound = true
-        } else {
-          nonEquiJoinPredicateFound = true
-        }
       // The boolean literal should be a valid condition type.
       case x: Literal if x.resultType == Types.BOOLEAN =>
       case x => failValidation(
@@ -500,11 +490,6 @@ case class Join(
           s"Invalid join condition: $expression. At least one equi-join predicate is " +
             s"required.")
       }
-      if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
-        failValidation(
-          s"Invalid join condition: $expression. Non-equality join predicates or local" +
-            s" predicates are not supported in outer joins.")
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index acbf94d..f039cf9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.plan.nodes.dataset
 
+import java.lang.Iterable
+import java.lang.{Boolean => JBool}
+
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
@@ -25,15 +28,19 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.util.mapping.IntPair
-import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction, GroupReduceFunction, JoinFunction}
+import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableException, Types}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonJoin
-import org.apache.flink.table.runtime.FlatJoinRunner
+import org.apache.flink.table.runtime._
 import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -156,65 +163,393 @@ class DataSetJoin(
     val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val (joinOperator, nullCheck) = joinType match {
-      case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
-      case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
-      case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
-      case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
+    joinType match {
+      case JoinRelType.INNER =>
+        addInnerJoin(
+          leftDataSet,
+          rightDataSet,
+          leftKeys.toArray,
+          rightKeys.toArray,
+          returnType,
+          config)
+      case JoinRelType.LEFT =>
+        addLeftOuterJoin(
+          leftDataSet,
+          rightDataSet,
+          leftKeys.toArray,
+          rightKeys.toArray,
+          returnType,
+          config)
+      case JoinRelType.RIGHT =>
+        addRightOuterJoin(
+          leftDataSet,
+          rightDataSet,
+          leftKeys.toArray,
+          rightKeys.toArray,
+          returnType,
+          config)
+      case JoinRelType.FULL =>
+        addFullOuterJoin(
+          leftDataSet,
+          rightDataSet,
+          leftKeys.toArray,
+          rightKeys.toArray,
+          returnType,
+          config)
     }
+  }
 
-    if (nullCheck && !config.getNullCheck) {
-      throw TableException("Null check in TableConfig must be enabled for outer joins.")
-    }
+  private def addInnerJoin(
+      left: DataSet[Row],
+      right: DataSet[Row],
+      leftKeys: Array[Int],
+      rightKeys: Array[Int],
+      resultType: TypeInformation[Row],
+      config: TableConfig): DataSet[Row] = {
 
     val generator = new FunctionCodeGenerator(
       config,
-      nullCheck,
-      leftDataSet.getType,
-      Some(rightDataSet.getType))
+      false,
+      left.getType,
+      Some(right.getType))
     val conversion = generator.generateConverterResultExpression(
-      returnType,
+      resultType,
       joinRowType.getFieldNames)
 
-    var body = ""
+    val condition = generator.generateExpression(joinCondition)
+    val body =
+      s"""
+         |${condition.code}
+         |if (${condition.resultTerm}) {
+         |  ${conversion.code}
+         |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+         |}
+         |""".stripMargin
 
-    if (joinInfo.isEqui) {
-      // only equality condition
-      body = s"""
-           |${conversion.code}
-           |${generator.collectorTerm}.collect(${conversion.resultTerm});
-           |""".stripMargin
-    }
-    else {
-      val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder)
-      val condition = generator.generateExpression(nonEquiPredicates)
-      body = s"""
-           |${condition.code}
-           |if (${condition.resultTerm}) {
-           |  ${conversion.code}
-           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-           |}
-           |""".stripMargin
-    }
     val genFunction = generator.generateFunction(
       ruleDescription,
       classOf[FlatJoinFunction[Row, Row, Row]],
       body,
-      returnType)
+      resultType)
 
     val joinFun = new FlatJoinRunner[Row, Row, Row](
       genFunction.name,
       genFunction.code,
       genFunction.returnType)
 
-    val joinOpName =
-      s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
-        s"join: (${joinSelectionToString(joinRowType)})"
+    left.join(right)
+      .where(leftKeys: _*)
+      .equalTo(rightKeys: _*)
+      .`with`(joinFun)
+      .name(getJoinOpName)
+  }
+
+  private def addLeftOuterJoin(
+      left: DataSet[Row],
+      right: DataSet[Row],
+      leftKeys: Array[Int],
+      rightKeys: Array[Int],
+      resultType: TypeInformation[Row],
+      config: TableConfig): DataSet[Row] = {
+
+    if (!config.getNullCheck) {
+      throw TableException("Null check in TableConfig must be enabled for outer joins.")
+    }
+
+    val joinOpName = getJoinOpName
+
+    // replace field names by indexed names for easier key handling
+    val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+    val rightType = right.getType.asInstanceOf[RowTypeInfo]
+
+    // partition and sort left input
+    // this step ensures we can reuse the sorting for all following operations
+    // (groupBy->join->groupBy)
+    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+
+    // fold identical rows of the left input
+    val foldedRowsLeft: DataSet[Row] = foldIdenticalRows(partitionedSortedLeft, leftType)
+
+    // create JoinFunction to evaluate join predicate
+    val predFun = generatePredicateFunction(leftType, rightType, config)
+    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+    val joinFun = new LeftOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+    // join left and right inputs, evaluate join predicate, and emit join pairs
+    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+    val joinPairs = foldedRowsLeft.leftOuterJoin(right, JoinHint.REPARTITION_SORT_MERGE)
+      .where(nestedLeftKeys: _*)
+      .equalTo(rightKeys: _*)
+      .`with`(joinFun)
+      .withForwardedFieldsFirst("f0->f0")
+      .name(joinOpName)
+
+    // create GroupReduceFunction to generate the join result
+    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+    val reduceFun = new LeftOuterJoinGroupReduceRunner(
+      convFun.name,
+      convFun.code,
+      convFun.returnType)
+
+    // convert join pairs to result.
+    // This step ensures we preserve the rows of the left input.
+    joinPairs
+      .groupBy("f0")
+      .reduceGroup(reduceFun)
+      .name(joinOpName)
+      .returns(resultType)
+  }
+
+  private def addRightOuterJoin(
+      left: DataSet[Row],
+      right: DataSet[Row],
+      leftKeys: Array[Int],
+      rightKeys: Array[Int],
+      resultType: TypeInformation[Row],
+      config: TableConfig): DataSet[Row] = {
+
+    if (!config.getNullCheck) {
+      throw TableException("Null check in TableConfig must be enabled for outer joins.")
+    }
+
+    val joinOpName = getJoinOpName
 
-    joinOperator
-      .where(leftKeys.toArray: _*)
-      .equalTo(rightKeys.toArray: _*)
+    // replace field names by indexed names for easier key handling
+    val leftType = left.getType.asInstanceOf[RowTypeInfo]
+    val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+    // partition and sort right input
+    // this step ensures we can reuse the sorting for all following operations
+    // (groupBy->join->groupBy)
+    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+    // fold identical rows of the right input
+    val foldedRowsRight: DataSet[Row] = foldIdenticalRows(partitionedSortedRight, rightType)
+
+    // create JoinFunction to evaluate join predicate
+    val predFun = generatePredicateFunction(leftType, rightType, config)
+    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+    val joinFun = new RightOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+    // join left and right inputs, evaluate join predicate, and emit join pairs
+    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+    val joinPairs = left.rightOuterJoin(foldedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+      .where(leftKeys: _*)
+      .equalTo(nestedRightKeys: _*)
       .`with`(joinFun)
+      .withForwardedFieldsSecond("f0->f1")
+      .name(joinOpName)
+
+    // create GroupReduceFunction to generate the join result
+    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+    val reduceFun = new RightOuterJoinGroupReduceRunner(
+      convFun.name,
+      convFun.code,
+      convFun.returnType)
+
+    // convert join pairs to result
+    // This step ensures we preserve the rows of the right input.
+    joinPairs
+      .groupBy("f1")
+      .reduceGroup(reduceFun)
       .name(joinOpName)
+      .returns(resultType)
   }
+
+  private def addFullOuterJoin(
+      left: DataSet[Row],
+      right: DataSet[Row],
+      leftKeys: Array[Int],
+      rightKeys: Array[Int],
+      resultType: TypeInformation[Row],
+      config: TableConfig): DataSet[Row] = {
+
+    if (!config.getNullCheck) {
+      throw TableException("Null check in TableConfig must be enabled for outer joins.")
+    }
+
+    val joinOpName = getJoinOpName
+
+    // replace field names by indexed names for easier key handling
+    val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+    val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+    // partition and sort left and right input
+    // this step ensures we can reuse the sorting for all following operations
+    // (groupBy->join->groupBy), except the second grouping to preserve right rows.
+    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+    // fold identical rows of the left and right input
+    val foldedRowsLeft: DataSet[Row] = foldIdenticalRows(partitionedSortedLeft, leftType)
+    val foldedRowsRight: DataSet[Row] = foldIdenticalRows(partitionedSortedRight, rightType)
+
+    // create JoinFunction to evaluate join predicate
+    val predFun = generatePredicateFunction(leftType, rightType, config)
+    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT, Types.INT)
+    val joinFun = new FullOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+    // join left and right inputs, evaluate join predicate, and emit join pairs
+    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+    val joinPairs = foldedRowsLeft
+      .fullOuterJoin(foldedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+      .where(nestedLeftKeys: _*)
+      .equalTo(nestedRightKeys: _*)
+      .`with`(joinFun)
+      .withForwardedFieldsFirst("f0->f0")
+      .withForwardedFieldsSecond("f0->f1")
+      .name(joinOpName)
+
+    // create GroupReduceFunctions to generate the join result
+    val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+    val leftReduceFun = new LeftFullOuterJoinGroupReduceRunner(
+      convFun.name,
+      convFun.code,
+      convFun.returnType)
+    val rightReduceFun = new RightFullOuterJoinGroupReduceRunner(
+      convFun.name,
+      convFun.code,
+      convFun.returnType)
+
+    // compute joined (left + right) and left preserved (left + null)
+    val joinedAndLeftPreserved = joinPairs
+      // filter for pairs with left row
+      .filter(new FilterFunction[Row](){
+        override def filter(row: Row): Boolean = row.getField(0) != null})
+      .groupBy("f0")
+      .reduceGroup(leftReduceFun)
+      .name(joinOpName)
+      .returns(resultType)
+
+    // compute right preserved (null + right)
+    val rightPreserved = joinPairs
+      // filter for pairs with right row
+      .filter(new FilterFunction[Row](){
+        override def filter(row: Row): Boolean = row.getField(1) != null})
+      .groupBy("f1")
+      .reduceGroup(rightReduceFun)
+      .name(joinOpName)
+      .returns(resultType)
+
+    // union joined (left + right), left preserved (left + null), and right preserved (null + right)
+    joinedAndLeftPreserved.union(rightPreserved)
+  }
+
+  private def getJoinOpName: String = {
+    s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
+      s"join: (${joinSelectionToString(joinRowType)})"
+  }
+
+  /** Returns an array of indicies with some indicies being a prefix. */
+  private def getFullIndiciesWithPrefix(keys: Array[Int], numFields: Int): Array[Int] = {
+    // get indicies of all fields which are not keys
+    val nonKeys = (0 until numFields).filter(!keys.contains(_))
+    // return all field indicies prefixed by keys
+    keys ++ nonKeys
+  }
+
+  /**
+    * Partitions the data set on the join keys and sort it on all field with the join keys being a
+    * prefix.
+    */
+  private def partitionAndSort(
+      dataSet: DataSet[Row],
+      partitionKeys: Array[Int]): DataSet[Row] = {
+
+    // construct full sort keys with partitionKeys being a prefix
+    val sortKeys = getFullIndiciesWithPrefix(partitionKeys, dataSet.getType.getArity)
+    // partition
+    val partitioned: DataSet[Row] = dataSet.partitionByHash(partitionKeys: _*)
+    // sort on all fields
+    sortKeys.foldLeft(partitioned: DataSet[Row]) { (d, i) =>
+      d.sortPartition(i, Order.ASCENDING).asInstanceOf[DataSet[Row]]
+    }
+  }
+
+  /**
+    * Folds identical rows of a data set into a single row with a duplicate count.
+    */
+  private def foldIdenticalRows(
+      dataSet: DataSet[Row],
+      dataSetType: TypeInformation[Row]): DataSet[Row] = {
+
+    val resultType = new RowTypeInfo(dataSetType, Types.INT)
+    val groupKeys = 0 until dataSetType.getArity
+
+    dataSet
+      // group on all fields of the input row
+      .groupBy(groupKeys: _*)
+      // fold identical rows
+      .reduceGroup(new GroupReduceFunction[Row, Row] {
+        val outTuple = new Row(2)
+        override def reduce(values: Iterable[Row], out: Collector[Row]): Unit = {
+          // count number of duplicates
+          var cnt = 0
+          val it = values.iterator()
+          while (it.hasNext) {
+            // set output row
+            outTuple.setField(0, it.next())
+            cnt += 1
+          }
+          // set count
+          outTuple.setField(1, cnt)
+          // emit folded row with count
+          out.collect(outTuple)
+        }
+      })
+      .returns(resultType)
+      .withForwardedFields("*->f0")
+      .name("fold identical rows")
+  }
+
+  /**
+    * Generates a [[GeneratedFunction]] of a [[JoinFunction]] to evaluate the join predicate.
+    * The function returns the result of the predicate as [[JBool]].
+    */
+  private def generatePredicateFunction(
+      leftType: TypeInformation[Row],
+      rightType: TypeInformation[Row],
+      config: TableConfig): GeneratedFunction[JoinFunction[Row, Row, JBool], JBool] = {
+    val predGenerator = new FunctionCodeGenerator(config, false, leftType, Some(rightType))
+    val condition = predGenerator.generateExpression(joinCondition)
+    val predCode =
+      s"""
+         |${condition.code}
+         |return (${condition.resultTerm});
+         |""".stripMargin
+
+    predGenerator.generateFunction(
+      "OuterJoinPredicate",
+      classOf[JoinFunction[Row, Row, JBool]],
+      predCode,
+      Types.BOOLEAN)
+  }
+
+  /**
+    * Generates a [[GeneratedFunction]] of a [[JoinFunction]] to produce the join result.
+    */
+  private def generateConversionFunction(
+      leftType: TypeInformation[Row],
+      rightType: TypeInformation[Row],
+      resultType: TypeInformation[Row],
+      config: TableConfig): GeneratedFunction[JoinFunction[Row, Row, Row], Row] = {
+
+    val conversionGenerator = new FunctionCodeGenerator(config, true, leftType, Some(rightType))
+    val conversion = conversionGenerator.generateConverterResultExpression(
+      resultType,
+      joinRowType.getFieldNames)
+    val convCode =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    conversionGenerator.generateFunction(
+      "OuterJoinConverter",
+      classOf[JoinFunction[Row, Row, Row]],
+      convCode,
+      resultType)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
index 33c4caf..869ab31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -77,7 +77,7 @@ private class FlinkLogicalJoinConverter
     val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
     val joinInfo = join.analyzeCondition
 
-    hasEqualityPredicates(join, joinInfo) || isSingleRowJoin(join)
+    hasEqualityPredicates(joinInfo) || isSingleRowJoin(join)
   }
 
   override def convert(rel: RelNode): RelNode = {
@@ -95,10 +95,9 @@ private class FlinkLogicalJoinConverter
       join.getJoinType)
   }
 
-  private def hasEqualityPredicates(join: LogicalJoin, joinInfo: JoinInfo): Boolean = {
+  private def hasEqualityPredicates(joinInfo: JoinInfo): Boolean = {
     // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
-    // and disable outer joins with non-equality predicates(see FLINK-5520)
-    !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
+    !joinInfo.pairs().isEmpty
   }
 
   private def isSingleRowJoin(join: LogicalJoin): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
index d880b35..eded45f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -41,8 +41,7 @@ class DataSetJoinRule
     val joinInfo = join.analyzeCondition
 
     // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
-    // and disable outer joins with non-equality predicates(see FLINK-5520)
-    !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
+    !joinInfo.pairs().isEmpty
   }
 
   override def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala
new file mode 100644
index 0000000..9b0f08e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{JoinFunction, RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+abstract class OuterJoinGroupReduceRunner(
+    name: String,
+    code: String,
+    @transient var returnType: TypeInformation[Row])
+  extends RichGroupReduceFunction[Row, Row]
+    with Compiler[JoinFunction[Row, Row, Row]] with Logging {
+
+  protected var function: JoinFunction[Row, Row, Row] = null
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling JoinFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating JoinFunction.")
+    function = clazz.newInstance()
+  }
+}
+
+class LeftOuterJoinGroupReduceRunner(
+    name: String,
+    code: String,
+    returnType: TypeInformation[Row])
+  extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+  override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+    var needsNull = true
+    var left: Row = null
+    var dupCnt: Int = 0
+
+    val pairsIt = pairs.iterator()
+
+    // go over all joined pairs
+    while (pairsIt.hasNext) {
+
+      val pair = pairsIt.next()
+      left = pair.getField(0).asInstanceOf[Row]
+      dupCnt = pair.getField(2).asInstanceOf[Int]
+      val right = pair.getField(1).asInstanceOf[Row]
+
+      if (right != null) {
+        // we have a joining right record. Do not emit a null-padded result record.
+        needsNull = false
+        val result = function.join(left, right)
+        // emit as many result records as the duplication count of the left record
+        var i = dupCnt
+        while (i > 0) {
+          out.collect(result)
+          i -= 1
+        }
+      }
+    }
+
+    // we did not find a single joining right record. Emit null-padded result records.
+    if (needsNull) {
+      val result = function.join(left, null)
+      // emit as many null-padded result records as the duplication count of the left record.
+      while (dupCnt > 0) {
+        out.collect(result)
+        dupCnt -= 1
+      }
+    }
+  }
+}
+
+class RightOuterJoinGroupReduceRunner(
+  name: String,
+  code: String,
+  returnType: TypeInformation[Row])
+  extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+  override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+    var needsNull = true
+    var right: Row = null
+    var dupCnt: Int = 0
+
+    val pairsIt = pairs.iterator()
+
+    // go over all joined pairs
+    while (pairsIt.hasNext) {
+
+      val pair = pairsIt.next()
+      right = pair.getField(1).asInstanceOf[Row]
+      dupCnt = pair.getField(2).asInstanceOf[Int]
+      val left = pair.getField(0).asInstanceOf[Row]
+
+      if (left != null) {
+        // we have a joining left record. Do not emit a null-padded result record.
+        needsNull = false
+        val result = function.join(left, right)
+        // emit as many result records as the duplication count of the right record
+        var i = dupCnt
+        while (i > 0) {
+          out.collect(result)
+          i -= 1
+        }
+      }
+    }
+
+    // we did not find a single joining left record. Emit null-padded result records.
+    if (needsNull) {
+      val result = function.join(null, right)
+      // emit as many null-padded result records as the duplication count of the right record.
+      while (dupCnt > 0) {
+        out.collect(result)
+        dupCnt -= 1
+      }
+    }
+  }
+}
+
+/**
+  * Emits a part of the results of a full outer join:
+  *
+  * - join result from matching join pairs (left + right)
+  * - preserved left rows (left + null)
+  *
+  * Preserved right rows (null, right) are emitted by RightFullOuterJoinGroupReduceRunner.
+  */
+class LeftFullOuterJoinGroupReduceRunner(
+  name: String,
+  code: String,
+  returnType: TypeInformation[Row])
+  extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+  override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+    var needsNull = true
+    var left: Row = null
+    var leftDupCnt: Int = 0
+
+    val pairsIt = pairs.iterator()
+
+    // go over all joined pairs
+    while (pairsIt.hasNext) {
+
+      val pair = pairsIt.next()
+      left = pair.getField(0).asInstanceOf[Row]
+      leftDupCnt = pair.getField(2).asInstanceOf[Int]
+      val right = pair.getField(1).asInstanceOf[Row]
+
+      if (right != null) {
+        // we have a joining right record. Do not emit a null-padded result record.
+        needsNull = false
+        val rightDupCnt = pair.getField(3).asInstanceOf[Int]
+        // emit as many result records as the product of the duplication counts of left and right.
+        var i = leftDupCnt * rightDupCnt
+        val result = function.join(left, right)
+        while (i > 0) {
+          out.collect(result)
+          i -= 1
+        }
+      }
+    }
+
+    // we did not find a single joining right record. Emit null-padded result records.
+    if (needsNull) {
+      val result = function.join(left, null)
+      // emit as many null-padded result records as the duplication count of the left record.
+      while (leftDupCnt > 0) {
+        out.collect(result)
+        leftDupCnt -= 1
+      }
+    }
+  }
+}
+
+/**
+  * Emits a part of the results of a full outer join:
+  *
+  * - preserved right rows (null, right)
+  *
+  * Join result from matching join pairs (left + right) and preserved left rows (left + null) are
+  * emitted by LeftFullOuterJoinGroupReduceRunner.
+  */
+class RightFullOuterJoinGroupReduceRunner(
+  name: String,
+  code: String,
+  returnType: TypeInformation[Row])
+  extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+  override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+    var needsNull = true
+    var right: Row = null
+    var rightDupCnt: Int = 0
+
+    val pairsIt = pairs.iterator()
+
+    // go over all joined pairs
+    while (pairsIt.hasNext && needsNull) {
+
+      val pair = pairsIt.next()
+      right = pair.getField(1).asInstanceOf[Row]
+      rightDupCnt = pair.getField(3).asInstanceOf[Int]
+      val left = pair.getField(0).asInstanceOf[Row]
+
+      if (left != null) {
+        // we have a joining left record. Do not emit a null-padded result record.
+        needsNull = false
+        // we do NOT emit join results here. This was done by LeftFullOuterJoinGroupReduceRunner.
+      }
+    }
+
+    // we did not find a single joining left record. Emit null-padded result records.
+    if (needsNull) {
+      val result = function.join(null, right)
+      // emit as many null-padded result records as the duplication count of the right record.
+      while (rightDupCnt > 0) {
+        out.collect(result)
+        rightDupCnt -= 1
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala
new file mode 100644
index 0000000..a9e0211
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.functions.{JoinFunction, RichFlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+abstract class OuterJoinRunner(
+    name: String,
+    code: String,
+    @transient var returnType: TypeInformation[Row])
+  extends RichFlatJoinFunction[Row, Row, Row]
+  with ResultTypeQueryable[Row]
+  with Compiler[JoinFunction[Row, Row, JBool]]
+  with Logging {
+
+  protected var function: JoinFunction[Row, Row, JBool] = null
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatJoinFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def getProducedType: TypeInformation[Row] = returnType
+}
+
+/**
+  * Emits left outer join pairs of left and right rows.
+  * Left rows are always preserved if no matching right row is found (predicate evaluates to false
+  * or right input row is null).
+  */
+class LeftOuterJoinRunner(
+    name: String,
+    code: String,
+    returnType: TypeInformation[Row])
+  extends OuterJoinRunner(name, code, returnType) {
+
+  val outRow = new Row(3)
+
+  override final def join(leftWithCnt: Row, right: Row, out: Collector[Row]): Unit = {
+
+    val left: Row = leftWithCnt.getField(0).asInstanceOf[Row]
+    val leftCnt = leftWithCnt.getField(1)
+
+    outRow.setField(0, left)
+    outRow.setField(2, leftCnt)
+
+    if (right == null) {
+      // right input row is null. Emit pair with null as right row
+      outRow.setField(1, null)
+    } else {
+      // evaluate predicate.
+      if (function.join(left, right)) {
+        // emit pair with right row
+        outRow.setField(1, right)
+      } else {
+        // emit pair with null as right row
+        outRow.setField(1, null)
+      }
+    }
+    out.collect(outRow)
+
+  }
+}
+
+/**
+  * Emits right outer join pairs of left and right rows.
+  * Right rows are always preserved if no matching left row is found (predicate evaluates to false
+  * or left input row is null).
+  */
+class RightOuterJoinRunner(
+  name: String,
+  code: String,
+  returnType: TypeInformation[Row])
+  extends OuterJoinRunner(name, code, returnType) {
+
+  val outRow = new Row(3)
+
+  override final def join(left: Row, rightWithCnt: Row, out: Collector[Row]): Unit = {
+
+    val right: Row = rightWithCnt.getField(0).asInstanceOf[Row]
+    val rightCnt = rightWithCnt.getField(1)
+
+    outRow.setField(1, right)
+    outRow.setField(2, rightCnt)
+
+    if (left == null) {
+      // left input row is null. Emit pair with null as left row
+      outRow.setField(0, null)
+    } else {
+      // evaluate predicate.
+      if (function.join(left, right)) {
+        outRow.setField(0, left)
+      } else {
+        outRow.setField(0, null)
+      }
+    }
+    out.collect(outRow)
+  }
+}
+
+/**
+  * Emits full outer join pairs of left and right rows.
+  * Left and right rows are always preserved if no matching right row is found (predicate evaluates
+  * to false or left or right input row is null).
+  */
+class FullOuterJoinRunner(
+  name: String,
+  code: String,
+  returnType: TypeInformation[Row])
+  extends OuterJoinRunner(name, code, returnType) {
+
+  val outRow = new Row(4)
+
+  override final def join(leftWithCnt: Row, rightWithCnt: Row, out: Collector[Row]): Unit = {
+
+    if (leftWithCnt == null) {
+      // left row is null. Emit join pair with null as left row.
+      val right: Row = rightWithCnt.getField(0).asInstanceOf[Row]
+      val rightCnt = rightWithCnt.getField(1)
+
+      outRow.setField(0, null)
+      outRow.setField(1, right)
+      outRow.setField(2, null)
+      outRow.setField(3, rightCnt)
+      out.collect(outRow)
+    } else if (rightWithCnt == null) {
+      // right row is null. Emit join pair with null as right row.
+      val left: Row = leftWithCnt.getField(0).asInstanceOf[Row]
+      val leftCnt = leftWithCnt.getField(1)
+
+      outRow.setField(0, left)
+      outRow.setField(1, null)
+      outRow.setField(2, leftCnt)
+      outRow.setField(3, null)
+      out.collect(outRow)
+    } else {
+      // both input rows are not null. Evaluate predicate.
+      val left: Row = leftWithCnt.getField(0).asInstanceOf[Row]
+      val leftCnt = leftWithCnt.getField(1)
+      val right: Row = rightWithCnt.getField(0).asInstanceOf[Row]
+      val rightCnt = rightWithCnt.getField(1)
+
+      if (function.join(left, right)) {
+        // predicate was true. Set rows in join pair
+        outRow.setField(0, left)
+        outRow.setField(1, right)
+        outRow.setField(2, leftCnt)
+        outRow.setField(3, rightCnt)
+        out.collect(outRow)
+      } else {
+        // predicate was false. Emit two join pairs to preserve both input rows.
+        // emit pair with left row and null as right row
+        outRow.setField(0, left)
+        outRow.setField(1, null)
+        outRow.setField(2, leftCnt)
+        outRow.setField(3, null)
+        out.collect(outRow)
+
+        // emit pair with right row and null as left row
+        outRow.setField(0, null)
+        outRow.setField(1, right)
+        outRow.setField(2, null)
+        outRow.setField(3, rightCnt)
+        out.collect(outRow)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala
new file mode 100644
index 0000000..a3a597f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+
+  @Test
+  def testLeftOuterJoinEquiPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "=(a, z)"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "LeftOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testLeftOuterJoinEquiAndLocalPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z AND b < 2"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b", "<(b, 2) AS $f3")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "AND(=(a, z), $f3)"),
+        term("join", "a", "b", "$f3", "y", "z"),
+        term("joinType", "LeftOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testLeftOuterJoinEquiAndNonEquiPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z AND b < x"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        batchTableNode(1),
+        term("where", "AND(=(a, z), <(b, x))"),
+        term("join", "a", "b", "x", "y", "z"),
+        term("joinType", "LeftOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRightOuterJoinEquiPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "=(a, z)"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "RightOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRightOuterJoinEquiAndLocalPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, x FROM t RIGHT OUTER JOIN s ON a = z AND x < 2"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "x", "z", "<(x, 2) AS $f3")
+        ),
+        term("where", "AND(=(a, z), $f3)"),
+        term("join", "a", "b", "x", "z", "$f3"),
+        term("joinType", "RightOuterJoin")
+      ),
+      term("select", "b", "x")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRightOuterJoinEquiAndNonEquiPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z AND b < x"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        batchTableNode(1),
+        term("where", "AND(=(a, z), <(b, x))"),
+        term("join", "a", "b", "x", "y", "z"),
+        term("joinType", "RightOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFullOuterJoinEquiPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t FULL OUTER JOIN s ON a = z"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "=(a, z)"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "FullOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFullOuterJoinEquiAndLocalPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t FULL OUTER JOIN s ON a = z AND b < 2 AND z > 5"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b", "<(b, 2) AS $f3")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z", ">(z, 5) AS $f3")
+        ),
+        term("where", "AND(=(a, z), $f3, $f30)"),
+        term("join", "a", "b", "$f3", "y", "z", "$f30"),
+        term("joinType", "FullOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFullOuterJoinEquiAndNonEquiPred(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+    util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+    val query = "SELECT b, y FROM t FULL OUTER JOIN s ON a = z AND b < x"
+    val result = util.tableEnv.sqlQuery(query)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        batchTableNode(1),
+        term("where", "AND(=(a, z), <(b, x))"),
+        term("join", "a", "b", "x", "y", "z"),
+        term("joinType", "FullOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
index d9e0e10..628bf5f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -83,67 +83,34 @@ class JoinValidationTest extends TableTestBase {
   }
 
   @Test(expected = classOf[TableException])
-  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+  def testRightOuterJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
     util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
-    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
+    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b < e"
 
     util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
-  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+  def testLeftOuterJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
     util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
-    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
+    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b > e"
 
     util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
-  def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+  def testFullOuterJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
     util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
-    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
-
-    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRightOuterJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
-
-    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testLeftOuterJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
-
-    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testFullOuterJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-    util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
+    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b <> e"
 
     util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
new file mode 100644
index 0000000..9ee7fc2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+
+  @Test
+  def testLeftOuterJoinEquiPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.leftOuterJoin(s, 'a === 'z).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "=(a, z)"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "LeftOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testLeftOuterJoinEquiAndLocalPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.leftOuterJoin(s, 'a === 'z && 'b < 2).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "AND(=(a, z), <(b, 2))"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "LeftOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testLeftOuterJoinEquiAndNonEquiPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.leftOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        batchTableNode(1),
+        term("where", "AND(=(a, z), <(b, x))"),
+        term("join", "a", "b", "x", "y", "z"),
+        term("joinType", "LeftOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testRightOuterJoinEquiPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.rightOuterJoin(s, 'a === 'z).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "=(a, z)"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "RightOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testRightOuterJoinEquiAndLocalPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.rightOuterJoin(s, 'a === 'z && 'x < 2).select('b, 'x)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "x", "z")
+        ),
+        term("where", "AND(=(a, z), <(x, 2))"),
+        term("join", "a", "b", "x", "z"),
+        term("joinType", "RightOuterJoin")
+      ),
+      term("select", "b", "x")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testRightOuterJoinEquiAndNonEquiPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.rightOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        batchTableNode(1),
+        term("where", "AND(=(a, z), <(b, x))"),
+        term("join", "a", "b", "x", "y", "z"),
+        term("joinType", "RightOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testFullOuterJoinEquiPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.fullOuterJoin(s, 'a === 'z).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "=(a, z)"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "FullOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testFullOuterJoinEquiAndLocalPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.fullOuterJoin(s, 'a === 'z && 'b < 2).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(1),
+          term("select", "y", "z")
+        ),
+        term("where", "AND(=(a, z), <(b, 2))"),
+        term("join", "a", "b", "y", "z"),
+        term("joinType", "FullOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+
+  @Test
+  def testFullOuterJoinEquiAndNonEquiPred(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+    val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+    val joined = t.fullOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a", "b")
+        ),
+        batchTableNode(1),
+        term("where", "AND(=(a, z), <(b, x))"),
+        term("join", "a", "b", "x", "y", "z"),
+        term("joinType", "FullOuterJoin")
+      ),
+      term("select", "b", "y")
+    )
+
+    util.verifyTable(joined, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
index 3cc278b..e2ecd38 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
@@ -92,16 +92,7 @@ class JoinValidationTest extends TableTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testNoJoinCondition(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoEquiJoin(): Unit = {
+  def testLeftJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
     val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
@@ -110,57 +101,21 @@ class JoinValidationTest extends TableTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testRightJoinWithNonEquiJoinPredicate(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLeftJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFullJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testRightJoinWithLocalPredicate(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
+  def testRightJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
     val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
-    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+    ds2.rightOuterJoin(ds1, 'b < 'd).select('c, 'g)
   }
 
   @Test(expected = classOf[ValidationException])
-  def testFullJoinWithNonEquiJoinPredicate(): Unit = {
+  def testFullJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
     val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
-    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+    ds2.fullOuterJoin(ds1, 'b < 'd).select('c, 'g)
   }
 
   @Test(expected = classOf[ValidationException])

http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
index ddf622c..20348c4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
@@ -18,16 +18,21 @@
 
 package org.apache.flink.table.runtime.batch.table
 
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.MapPartitionFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.utils.TableFunc2
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -36,8 +41,9 @@ import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
 class JoinITCase(
+    execMode: TestExecutionMode,
     configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
+  extends TableProgramsClusterTestBase(execMode, configMode) {
 
   @Test
   def testJoin(): Unit = {
@@ -106,8 +112,10 @@ class JoinITCase(
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
 
@@ -214,8 +222,10 @@ class JoinITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
 
@@ -225,38 +235,144 @@ class JoinITCase(
       "Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
       "Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
       "Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
-      "Comment#15,null\n"
+      "Comment#15,null\n" +
+      "NullTuple,null\n" + "NullTuple,null\n"
     val results = joinT.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
+  def testLeftJoinWithNonEquiJoinPred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC",
+      "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+      "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null",
+      "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null",
+      "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+      "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      "NullTuple,null", "NullTuple,null")
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+  }
+
+  @Test
+  def testLeftJoinWithLeftLocalPred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 2).select('c, 'g)
+
+    val expected = Seq(
+      "Hello,Hallo Welt", "Hello,Hallo Welt wie",
+      "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "Hello world,BCD",
+      "Hi,null", "Hello world, how are you?,null", "I am fine.,null", "Luke Skywalker,null",
+      "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null",
+      "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+      "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      "NullTuple,null", "NullTuple,null")
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+  }
+
+  @Test
   def testRightJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
       "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
       "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n"
+      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n" +
+      "null,NullTuple\n" + "null,NullTuple\n"
     val results = joinT.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
+  def testRightJoinWithNonEquiJoinPred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds2 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+    val expected = Seq(
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC",
+      "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+      "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null",
+      "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null",
+      "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+      "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      "NullTuple,null", "NullTuple,null")
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+  }
+
+  @Test
+  def testRightJoinWithLeftLocalPred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds2 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 2).select('c, 'g)
+
+    val expected = Seq(
+      "Hello,Hallo Welt", "Hello,Hallo Welt wie",
+      "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "Hello world,BCD",
+      "Hi,null", "Hello world, how are you?,null", "I am fine.,null", "Luke Skywalker,null",
+      "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null",
+      "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+      "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      "NullTuple,null", "NullTuple,null")
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+  }
+
+  @Test
   def testFullOuterJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
     val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
 
@@ -268,12 +384,74 @@ class JoinITCase(
       "Comment#5,null\n" + "Comment#6,null\n" + "Comment#7,null\n" + "Comment#8,null\n" +
       "Comment#9,null\n" + "Comment#10,null\n" + "Comment#11,null\n" + "Comment#12,null\n" +
       "Comment#13,null\n" + "Comment#14,null\n" + "Comment#15,null\n" +
-      "Hello world, how are you?,null\n"
+      "Hello world, how are you?,null\n" +
+      "NullTuple,null\n" + "NullTuple,null\n" + "null,NullTuple\n" + "null,NullTuple\n"
     val results = joinT.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
   @Test
+  def testFullJoinWithNonEquiJoinPred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+    val expected = Seq(
+      // join matches
+      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC",
+      "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+      // preserved left
+      "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null",
+      "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null",
+      "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+      "Comment#13,null", "Comment#14,null", "Comment#15,null", "NullTuple,null", "NullTuple,null",
+      // preserved right
+      "null,Hallo Welt wie", "null,CDE", "null,DEF", "null,EFG", "null,FGH", "null,GHI", "null,JKL",
+      "null,KLM", "null,NullTuple", "null,NullTuple")
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+  }
+
+  @Test
+  def testFullJoinWithLeftLocalPred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = addNullKey3Tuples(
+      CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = addNullKey5Tuples(
+      CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b >= 2 && 'h === 1).select('c, 'g)
+
+    val expected = Seq(
+      // join matches
+      "Hello,Hallo Welt wie", "Hello world, how are you?,DEF", "Hello world, how are you?,EFG",
+      "I am fine.,GHI",
+      // preserved left
+      "Hi,null", "Hello world,null", "Luke Skywalker,null",
+      "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null",
+      "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+      "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null",
+      "NullTuple,null", "NullTuple,null",
+      // preserved right
+      "null,Hallo", "null,Hallo Welt", "null,Hallo Welt wie gehts?", "null,ABC", "null,BCD",
+      "null,CDE", "null,FGH", "null,HIJ", "null,IJK", "null,JKL", "null,KLM",
+      "null,NullTuple", "null,NullTuple")
+
+    val results = joinT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+  }
+
+  @Test
   def testUDTFJoinOnTuples(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -295,4 +473,40 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  private def addNullKey3Tuples(rows: DataSet[(Int, Long, String)]) = {
+    rows.mapPartition(
+      new MapPartitionFunction[(Int, Long, String), (Integer, Long, String)] {
+
+        override def mapPartition(
+            vals: Iterable[(Int, Long, String)],
+            out: Collector[(Integer, Long, String)]): Unit = {
+          val it = vals.iterator()
+          while (it.hasNext) {
+            val v = it.next()
+            out.collect((int2Integer(v._1), v._2, v._3))
+          }
+          out.collect((null.asInstanceOf[Integer], 999L, "NullTuple"))
+          out.collect((null.asInstanceOf[Integer], 999L, "NullTuple"))
+        }
+      })
+  }
+
+  private def addNullKey5Tuples(rows: DataSet[(Int, Long, Int, String, Long)]) = {
+    rows.mapPartition(
+      new MapPartitionFunction[(Int, Long, Int, String, Long), (Integer, Long, Int, String, Long)] {
+
+        override def mapPartition(
+            vals: Iterable[(Int, Long, Int, String, Long)],
+            out: Collector[(Integer, Long, Int, String, Long)]): Unit = {
+          val it = vals.iterator()
+          while (it.hasNext) {
+            val v = it.next()
+            out.collect((int2Integer(v._1), v._2, v._3, v._4, v._5))
+          }
+          out.collect((null.asInstanceOf[Integer], 999L, 999, "NullTuple", 999L))
+          out.collect((null.asInstanceOf[Integer], 999L, 999, "NullTuple", 999L))
+        }
+      })
+  }
+
 }


[2/3] flink git commit: [hotfix] [docs] Synchronize the Table API and SQL documentation of time-windowed joins.

Posted by fh...@apache.org.
[hotfix] [docs] Synchronize the Table API and SQL documentation of time-windowed joins.

This closes #4874.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7da886a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7da886a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7da886a6

Branch: refs/heads/master
Commit: 7da886a666ab2dcfcf4c0c69a04cd6ee34f7e4fc
Parents: 2a63815
Author: Xingcan Cui <xi...@gmail.com>
Authored: Fri Oct 20 22:52:52 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 24 20:28:46 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md      | 10 +++++-----
 docs/dev/table/tableApi.md |  8 ++++----
 2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7da886a6/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 8a86f72..0eef48b 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -400,15 +400,15 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires a special join condition that bounds the time on both sides. This can be done by either two appropriate range predicates (<code> &lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+        <p>A time-windowed join requires at least one equi-join predicate and a special join 
+        condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
           <ul>
-            <li>Time predicates must compare time attributes of both input tables.</li>
-            <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
-            <li>Only range predicates are valid time predicates.</li>
+            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
+            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
           </ul>
         </p>
 
-        <p><b>Note:</b> Currently, only <code>INNER</code> joins are supported.</p>
+        <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight sql %}
 SELECT *

http://git-wip-us.apache.org/repos/asf/flink/blob/7da886a6/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 9380025..07b301d 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -527,9 +527,9 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> &lt;, &lt;=, &gt;=, &gt;</code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+        <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
           <ul>
-            <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
             <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
           </ul>
         </p>
@@ -644,9 +644,9 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> &lt;, &lt;=, &gt;=, &gt;</code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+        <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
           <ul>
-            <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
             <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
           </ul>
         </p>


[3/3] flink git commit: [FLINK-7821] [table] Deprecate Table.limit() and replace it by Table.offset() and Table.fetch().

Posted by fh...@apache.org.
[FLINK-7821] [table] Deprecate Table.limit() and replace it by Table.offset() and Table.fetch().

This closes #4813.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1c7e490
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1c7e490
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1c7e490

Branch: refs/heads/master
Commit: e1c7e490395c44913e816966b2fa3b8bfeb18160
Parents: 7da886a
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 12 10:56:19 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 24 20:28:46 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/tableApi.md                      | 42 +++++++------
 .../org/apache/flink/table/api/table.scala      | 66 +++++++++++++++++++-
 .../table/validation/SortValidationTest.scala   | 34 +++++++++-
 .../validation/CorrelateValidationTest.scala    |  4 +-
 .../UnsupportedOpsValidationTest.scala          | 12 +++-
 .../table/runtime/batch/table/SortITCase.scala  |  6 +-
 6 files changed, 135 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 07b301d..f0c4605 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -959,7 +959,7 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
 
 {% top %}
 
-### OrderBy & Limit
+### OrderBy, Offset & Fetch
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -987,19 +987,22 @@ Table result = in.orderBy("a.asc");
 
     <tr>
       <td>
-        <strong>Limit</strong><br>
+        <strong>Offset &amp; Fetch</strong><br>
         <span class="label label-primary">Batch</span>
       </td>
       <td>
-        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+        <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5); 
+
+// skips the first 3 records and returns all following records from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// skips the first 10 records and returns the next 5 records from the sorted result
+Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
 {% endhighlight %}
       </td>
     </tr>
@@ -1033,19 +1036,22 @@ val result = in.orderBy('a.asc);
 
     <tr>
       <td>
-        <strong>Limit</strong><br>
+        <strong>Offset &amp; Fetch</strong><br>
         <span class="label label-primary">Batch</span>
       </td>
       <td>
-        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+        <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
 {% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with the 4th record
+val in = ds.toTable(tableEnv, 'a, 'b, 'c)
+
+// returns the first 5 records from the sorted result
+val result1: Table = in.orderBy('a.asc).fetch(5)
+
+// skips the first 3 records and returns all following records from the sorted result
+val result2: Table = in.orderBy('a.asc).offset(3)
+
+// skips the first 10 records and returns the next 5 records from the sorted result
+val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)
 {% endhighlight %}
       </td>
     </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 30ed98e..0430e49 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -25,7 +25,6 @@ import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionPar
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical.{Minus, _}
-import org.apache.flink.table.plan.schema.TableSinkTable
 import org.apache.flink.table.sinks.TableSink
 
 import _root_.scala.annotation.varargs
@@ -721,12 +720,16 @@ class Table(
     * Example:
     *
     * {{{
-    *   // returns unlimited number of records beginning with the 4th record
+    *   // skips the first 3 rows and returns all following rows.
     *   tab.orderBy('name.desc).limit(3)
     * }}}
     *
     * @param offset number of records to skip
+    *
+    * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
     */
+  @Deprecated
+  @deprecated(message = "Deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
   def limit(offset: Int): Table = {
     new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
   }
@@ -739,18 +742,75 @@ class Table(
     * Example:
     *
     * {{{
-    *   // returns 5 records beginning with the 4th record
+    *   // skips the first 3 rows and returns the next 5 rows.
     *   tab.orderBy('name.desc).limit(3, 5)
     * }}}
     *
     * @param offset number of records to skip
     * @param fetch number of records to be returned
+    *
+    * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
     */
+  @Deprecated
+  @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
   def limit(offset: Int, fetch: Int): Table = {
     new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
   }
 
   /**
+    * Limits a sorted result from an offset position.
+    * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+    * thus must be preceded by it.
+    *
+    * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return n rows
+    * after skipping the first o rows.
+    *
+    * {{{
+    *   // skips the first 3 rows and returns all following rows.
+    *   tab.orderBy('name.desc).offset(3)
+    *   // skips the first 10 rows and returns the next 5 rows.
+    *   tab.orderBy('name.desc).offset(10).fetch(5)
+    * }}}
+    *
+    * @param offset number of records to skip
+    */
+  def offset(offset: Int): Table = {
+    new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Limits a sorted result to the first n rows.
+    * Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and
+    * thus must be preceded by it.
+    *
+    * [[Table.fetch(n)]] can be combined with a preceding [[Table.offset(o)]] call to return n rows
+    * after skipping the first o rows.
+    *
+    * {{{
+    *   // returns the first 3 records.
+    *   tab.orderBy('name.desc).fetch(3)
+    *   // skips the first 10 rows and returns the next 5 rows.
+    *   tab.orderBy('name.desc).offset(10).fetch(5)
+    * }}}
+    *
+    * @param fetch the number of records to return. Fetch must be >= 0.
+    */
+  def fetch(fetch: Int): Table = {
+    if (fetch < 0) {
+      throw ValidationException("FETCH count must be equal or larger than 0.")
+    }
+    this.logicalPlan match {
+      case Limit(o, -1, c) =>
+        // replace LIMIT without FETCH by LIMIT with FETCH
+        new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))
+      case Limit(_, _, _) =>
+        throw ValidationException("FETCH is already defined.")
+      case _ =>
+        new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))
+    }
+  }
+
+  /**
     * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
     *
     * A batch [[Table]] can only be written to a

http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
index 5064687..24156e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
@@ -27,10 +27,42 @@ import org.junit._
 class SortValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
+  def testOffsetWithoutOrder(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    ds.offset(5)
+  }
+
+  @Test(expected = classOf[ValidationException])
   def testFetchWithoutOrder(): Unit = {
     val util = batchTestUtil()
     val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
-    ds.limit(0, 5)
+    ds.fetch(5)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFetchBeforeOffset(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    ds.orderBy('a.asc).fetch(5).offset(10)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testOffsetBeforeOffset(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    ds.orderBy('a.asc).offset(10).offset(5)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNegativeFetch(): Unit = {
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    ds.orderBy('a.asc).offset(-1)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index c754afd..8f429e1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -86,13 +86,13 @@ class CorrelateValidationTest extends TableTestBase {
 
     // table function call limit
     expectExceptionThrown(
-      func1('c).orderBy('f0).limit(3),
+      func1('c).orderBy('f0).offset(3),
       "TableFunction can only be used in join and leftOuterJoin."
     )
 
     // table function call limit
     expectExceptionThrown(
-      func1('c).orderBy('f0).limit(0, 3),
+      func1('c).orderBy('f0).fetch(3),
       "TableFunction can only be used in join and leftOuterJoin."
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index de7f70a..c1ad08c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -89,11 +89,19 @@ class UnsupportedOpsValidationTest extends StreamingMultipleProgramsTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testLimit(): Unit = {
+  def testOffset(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.limit(0,5)
+    t1.offset(5)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFetch(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.fetch(5)
   }
 
   @Test(expected = classOf[ValidationException])

http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
index 568fad0..79bb711 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
@@ -143,7 +143,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).offset(3)
     implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       x.productElement(0).asInstanceOf[Int] )
 
@@ -171,7 +171,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
+    val t = ds.toTable(tEnv).orderBy('_1.desc).offset(3).fetch(5)
     implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       - x.productElement(0).asInstanceOf[Int] )
 
@@ -199,7 +199,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).fetch(5)
     implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
       x.productElement(0).asInstanceOf[Int] )