You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/24 18:29:04 UTC
[1/3] flink git commit: [FLINK-7755] [table] Fix NULL handling in
batch joins.
Repository: flink
Updated Branches:
refs/heads/master 512b9c478 -> e1c7e4903
[FLINK-7755] [table] Fix NULL handling in batch joins.
- Fixes also [FLINK-5498] Add support for non-equi join and local predicates to outer joins
This closes #4858.
This closes #3379 (stale PR for FLINK-5498).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a638155
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a638155
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a638155
Branch: refs/heads/master
Commit: 2a6381553e48e0145655c0aadd173813de610aa7
Parents: 512b9c4
Author: Fabian Hueske <fh...@apache.org>
Authored: Sun Oct 15 17:55:23 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 24 20:28:11 2017 +0200
----------------------------------------------------------------------
.../table/codegen/FunctionCodeGenerator.scala | 12 +
.../table/codegen/calls/ScalarOperators.scala | 38 +-
.../flink/table/plan/logical/operators.scala | 15 -
.../table/plan/nodes/dataset/DataSetJoin.scala | 419 +++++++++++++++++--
.../plan/nodes/logical/FlinkLogicalJoin.scala | 7 +-
.../plan/rules/dataSet/DataSetJoinRule.scala | 3 +-
.../runtime/outerJoinGroupReduceRunners.scala | 244 +++++++++++
.../flink/table/runtime/outerJoinRunners.scala | 195 +++++++++
.../flink/table/api/batch/sql/JoinTest.scala | 314 ++++++++++++++
.../sql/validation/JoinValidationTest.scala | 45 +-
.../flink/table/api/batch/table/JoinTest.scala | 304 ++++++++++++++
.../table/validation/JoinValidationTest.scala | 55 +--
.../table/runtime/batch/table/JoinITCase.scala | 240 ++++++++++-
13 files changed, 1702 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
index e86c4ab..2bd2fe7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
@@ -125,6 +125,18 @@ class FunctionCodeGenerator(
s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
}
+ // JoinFunction
+ else if (clazz == classOf[JoinFunction[_, _, _]]) {
+ val baseClass = classOf[RichJoinFunction[_, _, _]]
+ val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
+ val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
+ throw new CodeGenException("Input 2 for JoinFunction should not be null")))
+ (baseClass,
+ s"Object join(Object _in1, Object _in2)",
+ List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
+ s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
+ }
+
// ProcessFunction
else if (clazz == classOf[ProcessFunction[_, _]]) {
val baseClass = classOf[ProcessFunction[_, _]]
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 7de7aca..bd5b1f7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -382,33 +382,23 @@ object ScalarOperators {
|boolean $resultTerm = false;
|boolean $nullTerm = false;
|if (!${left.nullTerm} && !${left.resultTerm}) {
- | // left expr is false, skip right expr
+ | // left expr is false, result is always false
+ | // skip right expr
|} else {
| ${right.code}
|
- | if (!${left.nullTerm} && !${right.nullTerm}) {
- | $resultTerm = ${left.resultTerm} && ${right.resultTerm};
- | $nullTerm = false;
- | }
- | else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- | }
- | else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = false;
- | }
- | else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- | }
- | else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = false;
- | }
- | else {
- | $resultTerm = false;
- | $nullTerm = true;
+ | if (${left.nullTerm}) {
+ | // left is null (unknown)
+ | if (${right.nullTerm} || ${right.resultTerm}) {
+ | $nullTerm = true;
+ | }
+ | } else {
+ | // left is true
+ | if (${right.nullTerm}) {
+ | $nullTerm = true;
+ | } else if (${right.resultTerm}) {
+ | $resultTerm = true;
+ | }
| }
|}
""".stripMargin
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index e723eef..fe2bfe5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -457,8 +457,6 @@ case class Join(
}
var equiJoinPredicateFound = false
- var nonEquiJoinPredicateFound = false
- var localPredicateFound = false
// Whether the predicate is literal true.
val alwaysTrue = expression match {
case x: Literal if x.value.equals(true) => true
@@ -472,15 +470,7 @@ case class Join(
if (isAndBranch && checkIfJoinCondition(x)) {
equiJoinPredicateFound = true
}
- if (checkIfFilterCondition(x)) {
- localPredicateFound = true
- }
case x: BinaryComparison =>
- if (checkIfFilterCondition(x)) {
- localPredicateFound = true
- } else {
- nonEquiJoinPredicateFound = true
- }
// The boolean literal should be a valid condition type.
case x: Literal if x.resultType == Types.BOOLEAN =>
case x => failValidation(
@@ -500,11 +490,6 @@ case class Join(
s"Invalid join condition: $expression. At least one equi-join predicate is " +
s"required.")
}
- if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
- failValidation(
- s"Invalid join condition: $expression. Non-equality join predicates or local" +
- s" predicates are not supported in outer joins.")
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index acbf94d..f039cf9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -18,6 +18,9 @@
package org.apache.flink.table.plan.nodes.dataset
+import java.lang.Iterable
+import java.lang.{Boolean => JBool}
+
import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
@@ -25,15 +28,19 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.util.mapping.IntPair
-import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.functions.{FilterFunction, FlatJoinFunction, GroupReduceFunction, JoinFunction}
+import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableException, Types}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.nodes.CommonJoin
-import org.apache.flink.table.runtime.FlatJoinRunner
+import org.apache.flink.table.runtime._
import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
@@ -156,65 +163,393 @@ class DataSetJoin(
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val (joinOperator, nullCheck) = joinType match {
- case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
- case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
- case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
- case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
+ joinType match {
+ case JoinRelType.INNER =>
+ addInnerJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.LEFT =>
+ addLeftOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.RIGHT =>
+ addRightOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
+ case JoinRelType.FULL =>
+ addFullOuterJoin(
+ leftDataSet,
+ rightDataSet,
+ leftKeys.toArray,
+ rightKeys.toArray,
+ returnType,
+ config)
}
+ }
- if (nullCheck && !config.getNullCheck) {
- throw TableException("Null check in TableConfig must be enabled for outer joins.")
- }
+ private def addInnerJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
val generator = new FunctionCodeGenerator(
config,
- nullCheck,
- leftDataSet.getType,
- Some(rightDataSet.getType))
+ false,
+ left.getType,
+ Some(right.getType))
val conversion = generator.generateConverterResultExpression(
- returnType,
+ resultType,
joinRowType.getFieldNames)
- var body = ""
+ val condition = generator.generateExpression(joinCondition)
+ val body =
+ s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
- if (joinInfo.isEqui) {
- // only equality condition
- body = s"""
- |${conversion.code}
- |${generator.collectorTerm}.collect(${conversion.resultTerm});
- |""".stripMargin
- }
- else {
- val nonEquiPredicates = joinInfo.getRemaining(this.cluster.getRexBuilder)
- val condition = generator.generateExpression(nonEquiPredicates)
- body = s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${generator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
- }
val genFunction = generator.generateFunction(
ruleDescription,
classOf[FlatJoinFunction[Row, Row, Row]],
body,
- returnType)
+ resultType)
val joinFun = new FlatJoinRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType)
- val joinOpName =
- s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
- s"join: (${joinSelectionToString(joinRowType)})"
+ left.join(right)
+ .where(leftKeys: _*)
+ .equalTo(rightKeys: _*)
+ .`with`(joinFun)
+ .name(getJoinOpName)
+ }
+
+ private def addLeftOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
+
+ // replace field names by indexed names for easier key handling
+ val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+ val rightType = right.getType.asInstanceOf[RowTypeInfo]
+
+ // partition and sort left input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy)
+ val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+
+ // fold identical rows of the left input
+ val foldedRowsLeft: DataSet[Row] = foldIdenticalRows(partitionedSortedLeft, leftType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+ val joinFun = new LeftOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+ val joinPairs = foldedRowsLeft.leftOuterJoin(right, JoinHint.REPARTITION_SORT_MERGE)
+ .where(nestedLeftKeys: _*)
+ .equalTo(rightKeys: _*)
+ .`with`(joinFun)
+ .withForwardedFieldsFirst("f0->f0")
+ .name(joinOpName)
+
+ // create GroupReduceFunction to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val reduceFun = new LeftOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // convert join pairs to result.
+ // This step ensures we preserve the rows of the left input.
+ joinPairs
+ .groupBy("f0")
+ .reduceGroup(reduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+ }
+
+ private def addRightOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
- joinOperator
- .where(leftKeys.toArray: _*)
- .equalTo(rightKeys.toArray: _*)
+ // replace field names by indexed names for easier key handling
+ val leftType = left.getType.asInstanceOf[RowTypeInfo]
+ val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+ // partition and sort right input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy)
+ val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+ // fold identical rows of the right input
+ val foldedRowsRight: DataSet[Row] = foldIdenticalRows(partitionedSortedRight, rightType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
+ val joinFun = new RightOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+ val joinPairs = left.rightOuterJoin(foldedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+ .where(leftKeys: _*)
+ .equalTo(nestedRightKeys: _*)
.`with`(joinFun)
+ .withForwardedFieldsSecond("f0->f1")
+ .name(joinOpName)
+
+ // create GroupReduceFunction to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val reduceFun = new RightOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // convert join pairs to result
+ // This step ensures we preserve the rows of the right input.
+ joinPairs
+ .groupBy("f1")
+ .reduceGroup(reduceFun)
.name(joinOpName)
+ .returns(resultType)
}
+
+ private def addFullOuterJoin(
+ left: DataSet[Row],
+ right: DataSet[Row],
+ leftKeys: Array[Int],
+ rightKeys: Array[Int],
+ resultType: TypeInformation[Row],
+ config: TableConfig): DataSet[Row] = {
+
+ if (!config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val joinOpName = getJoinOpName
+
+ // replace field names by indexed names for easier key handling
+ val leftType = new RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+ val rightType = new RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
+
+ // partition and sort left and right input
+ // this step ensures we can reuse the sorting for all following operations
+ // (groupBy->join->groupBy), except the second grouping to preserve right rows.
+ val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, leftKeys)
+ val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, rightKeys)
+
+ // fold identical rows of the left and right input
+ val foldedRowsLeft: DataSet[Row] = foldIdenticalRows(partitionedSortedLeft, leftType)
+ val foldedRowsRight: DataSet[Row] = foldIdenticalRows(partitionedSortedRight, rightType)
+
+ // create JoinFunction to evaluate join predicate
+ val predFun = generatePredicateFunction(leftType, rightType, config)
+ val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT, Types.INT)
+ val joinFun = new FullOuterJoinRunner(predFun.name, predFun.code, joinOutType)
+
+ // join left and right inputs, evaluate join predicate, and emit join pairs
+ val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
+ val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
+ val joinPairs = foldedRowsLeft
+ .fullOuterJoin(foldedRowsRight, JoinHint.REPARTITION_SORT_MERGE)
+ .where(nestedLeftKeys: _*)
+ .equalTo(nestedRightKeys: _*)
+ .`with`(joinFun)
+ .withForwardedFieldsFirst("f0->f0")
+ .withForwardedFieldsSecond("f0->f1")
+ .name(joinOpName)
+
+ // create GroupReduceFunctions to generate the join result
+ val convFun = generateConversionFunction(leftType, rightType, resultType, config)
+ val leftReduceFun = new LeftFullOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+ val rightReduceFun = new RightFullOuterJoinGroupReduceRunner(
+ convFun.name,
+ convFun.code,
+ convFun.returnType)
+
+ // compute joined (left + right) and left preserved (left + null)
+ val joinedAndLeftPreserved = joinPairs
+ // filter for pairs with left row
+ .filter(new FilterFunction[Row](){
+ override def filter(row: Row): Boolean = row.getField(0) != null})
+ .groupBy("f0")
+ .reduceGroup(leftReduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+
+ // compute right preserved (null + right)
+ val rightPreserved = joinPairs
+ // filter for pairs with right row
+ .filter(new FilterFunction[Row](){
+ override def filter(row: Row): Boolean = row.getField(1) != null})
+ .groupBy("f1")
+ .reduceGroup(rightReduceFun)
+ .name(joinOpName)
+ .returns(resultType)
+
+ // union joined (left + right), left preserved (left + null), and right preserved (null + right)
+ joinedAndLeftPreserved.union(rightPreserved)
+ }
+
+ private def getJoinOpName: String = {
+ s"where: (${joinConditionToString(joinRowType, joinCondition, getExpressionString)}), " +
+ s"join: (${joinSelectionToString(joinRowType)})"
+ }
+
+ /** Returns an array of indicies with some indicies being a prefix. */
+ private def getFullIndiciesWithPrefix(keys: Array[Int], numFields: Int): Array[Int] = {
+ // get indicies of all fields which are not keys
+ val nonKeys = (0 until numFields).filter(!keys.contains(_))
+ // return all field indicies prefixed by keys
+ keys ++ nonKeys
+ }
+
+ /**
+ * Partitions the data set on the join keys and sort it on all field with the join keys being a
+ * prefix.
+ */
+ private def partitionAndSort(
+ dataSet: DataSet[Row],
+ partitionKeys: Array[Int]): DataSet[Row] = {
+
+ // construct full sort keys with partitionKeys being a prefix
+ val sortKeys = getFullIndiciesWithPrefix(partitionKeys, dataSet.getType.getArity)
+ // partition
+ val partitioned: DataSet[Row] = dataSet.partitionByHash(partitionKeys: _*)
+ // sort on all fields
+ sortKeys.foldLeft(partitioned: DataSet[Row]) { (d, i) =>
+ d.sortPartition(i, Order.ASCENDING).asInstanceOf[DataSet[Row]]
+ }
+ }
+
+ /**
+ * Folds identical rows of a data set into a single row with a duplicate count.
+ */
+ private def foldIdenticalRows(
+ dataSet: DataSet[Row],
+ dataSetType: TypeInformation[Row]): DataSet[Row] = {
+
+ val resultType = new RowTypeInfo(dataSetType, Types.INT)
+ val groupKeys = 0 until dataSetType.getArity
+
+ dataSet
+ // group on all fields of the input row
+ .groupBy(groupKeys: _*)
+ // fold identical rows
+ .reduceGroup(new GroupReduceFunction[Row, Row] {
+ val outTuple = new Row(2)
+ override def reduce(values: Iterable[Row], out: Collector[Row]): Unit = {
+ // count number of duplicates
+ var cnt = 0
+ val it = values.iterator()
+ while (it.hasNext) {
+ // set output row
+ outTuple.setField(0, it.next())
+ cnt += 1
+ }
+ // set count
+ outTuple.setField(1, cnt)
+ // emit folded row with count
+ out.collect(outTuple)
+ }
+ })
+ .returns(resultType)
+ .withForwardedFields("*->f0")
+ .name("fold identical rows")
+ }
+
+ /**
+ * Generates a [[GeneratedFunction]] of a [[JoinFunction]] to evaluate the join predicate.
+ * The function returns the result of the predicate as [[JBool]].
+ */
+ private def generatePredicateFunction(
+ leftType: TypeInformation[Row],
+ rightType: TypeInformation[Row],
+ config: TableConfig): GeneratedFunction[JoinFunction[Row, Row, JBool], JBool] = {
+ val predGenerator = new FunctionCodeGenerator(config, false, leftType, Some(rightType))
+ val condition = predGenerator.generateExpression(joinCondition)
+ val predCode =
+ s"""
+ |${condition.code}
+ |return (${condition.resultTerm});
+ |""".stripMargin
+
+ predGenerator.generateFunction(
+ "OuterJoinPredicate",
+ classOf[JoinFunction[Row, Row, JBool]],
+ predCode,
+ Types.BOOLEAN)
+ }
+
+ /**
+ * Generates a [[GeneratedFunction]] of a [[JoinFunction]] to produce the join result.
+ */
+ private def generateConversionFunction(
+ leftType: TypeInformation[Row],
+ rightType: TypeInformation[Row],
+ resultType: TypeInformation[Row],
+ config: TableConfig): GeneratedFunction[JoinFunction[Row, Row, Row], Row] = {
+
+ val conversionGenerator = new FunctionCodeGenerator(config, true, leftType, Some(rightType))
+ val conversion = conversionGenerator.generateConverterResultExpression(
+ resultType,
+ joinRowType.getFieldNames)
+ val convCode =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ conversionGenerator.generateFunction(
+ "OuterJoinConverter",
+ classOf[JoinFunction[Row, Row, Row]],
+ convCode,
+ resultType)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
index 33c4caf..869ab31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -77,7 +77,7 @@ private class FlinkLogicalJoinConverter
val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
val joinInfo = join.analyzeCondition
- hasEqualityPredicates(join, joinInfo) || isSingleRowJoin(join)
+ hasEqualityPredicates(joinInfo) || isSingleRowJoin(join)
}
override def convert(rel: RelNode): RelNode = {
@@ -95,10 +95,9 @@ private class FlinkLogicalJoinConverter
join.getJoinType)
}
- private def hasEqualityPredicates(join: LogicalJoin, joinInfo: JoinInfo): Boolean = {
+ private def hasEqualityPredicates(joinInfo: JoinInfo): Boolean = {
// joins require an equi-condition or a conjunctive predicate with at least one equi-condition
- // and disable outer joins with non-equality predicates(see FLINK-5520)
- !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
+ !joinInfo.pairs().isEmpty
}
private def isSingleRowJoin(join: LogicalJoin): Boolean = {
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
index d880b35..eded45f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -41,8 +41,7 @@ class DataSetJoinRule
val joinInfo = join.analyzeCondition
// joins require an equi-condition or a conjunctive predicate with at least one equi-condition
- // and disable outer joins with non-equality predicates(see FLINK-5520)
- !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
+ !joinInfo.pairs().isEmpty
}
override def convert(rel: RelNode): RelNode = {
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala
new file mode 100644
index 0000000..9b0f08e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinGroupReduceRunners.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{JoinFunction, RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+abstract class OuterJoinGroupReduceRunner(
+ name: String,
+ code: String,
+ @transient var returnType: TypeInformation[Row])
+ extends RichGroupReduceFunction[Row, Row]
+ with Compiler[JoinFunction[Row, Row, Row]] with Logging {
+
+ protected var function: JoinFunction[Row, Row, Row] = null
+
+ override def open(config: Configuration) {
+ LOG.debug(s"Compiling JoinFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating JoinFunction.")
+ function = clazz.newInstance()
+ }
+}
+
+class LeftOuterJoinGroupReduceRunner(
+ name: String,
+ code: String,
+ returnType: TypeInformation[Row])
+ extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+ override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+ var needsNull = true
+ var left: Row = null
+ var dupCnt: Int = 0
+
+ val pairsIt = pairs.iterator()
+
+ // go over all joined pairs
+ while (pairsIt.hasNext) {
+
+ val pair = pairsIt.next()
+ left = pair.getField(0).asInstanceOf[Row]
+ dupCnt = pair.getField(2).asInstanceOf[Int]
+ val right = pair.getField(1).asInstanceOf[Row]
+
+ if (right != null) {
+ // we have a joining right record. Do not emit a null-padded result record.
+ needsNull = false
+ val result = function.join(left, right)
+ // emit as many result records as the duplication count of the left record
+ var i = dupCnt
+ while (i > 0) {
+ out.collect(result)
+ i -= 1
+ }
+ }
+ }
+
+ // we did not find a single joining right record. Emit null-padded result records.
+ if (needsNull) {
+ val result = function.join(left, null)
+ // emit as many null-padded result records as the duplication count of the left record.
+ while (dupCnt > 0) {
+ out.collect(result)
+ dupCnt -= 1
+ }
+ }
+ }
+}
+
+class RightOuterJoinGroupReduceRunner(
+ name: String,
+ code: String,
+ returnType: TypeInformation[Row])
+ extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+ override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+ var needsNull = true
+ var right: Row = null
+ var dupCnt: Int = 0
+
+ val pairsIt = pairs.iterator()
+
+ // go over all joined pairs
+ while (pairsIt.hasNext) {
+
+ val pair = pairsIt.next()
+ right = pair.getField(1).asInstanceOf[Row]
+ dupCnt = pair.getField(2).asInstanceOf[Int]
+ val left = pair.getField(0).asInstanceOf[Row]
+
+ if (left != null) {
+ // we have a joining left record. Do not emit a null-padded result record.
+ needsNull = false
+ val result = function.join(left, right)
+ // emit as many result records as the duplication count of the right record
+ var i = dupCnt
+ while (i > 0) {
+ out.collect(result)
+ i -= 1
+ }
+ }
+ }
+
+ // we did not find a single joining left record. Emit null-padded result records.
+ if (needsNull) {
+ val result = function.join(null, right)
+ // emit as many null-padded result records as the duplication count of the right record.
+ while (dupCnt > 0) {
+ out.collect(result)
+ dupCnt -= 1
+ }
+ }
+ }
+}
+
+/**
+ * Emits a part of the results of a full outer join:
+ *
+ * - join result from matching join pairs (left + right)
+ * - preserved left rows (left + null)
+ *
+ * Preserved right rows (null, right) are emitted by RightFullOuterJoinGroupReduceRunner.
+ */
+class LeftFullOuterJoinGroupReduceRunner(
+ name: String,
+ code: String,
+ returnType: TypeInformation[Row])
+ extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+ override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+ var needsNull = true
+ var left: Row = null
+ var leftDupCnt: Int = 0
+
+ val pairsIt = pairs.iterator()
+
+ // go over all joined pairs
+ while (pairsIt.hasNext) {
+
+ val pair = pairsIt.next()
+ left = pair.getField(0).asInstanceOf[Row]
+ leftDupCnt = pair.getField(2).asInstanceOf[Int]
+ val right = pair.getField(1).asInstanceOf[Row]
+
+ if (right != null) {
+ // we have a joining right record. Do not emit a null-padded result record.
+ needsNull = false
+ val rightDupCnt = pair.getField(3).asInstanceOf[Int]
+ // emit as many result records as the product of the duplication counts of left and right.
+ var i = leftDupCnt * rightDupCnt
+ val result = function.join(left, right)
+ while (i > 0) {
+ out.collect(result)
+ i -= 1
+ }
+ }
+ }
+
+ // we did not find a single joining right record. Emit null-padded result records.
+ if (needsNull) {
+ val result = function.join(left, null)
+ // emit as many null-padded result records as the duplication count of the left record.
+ while (leftDupCnt > 0) {
+ out.collect(result)
+ leftDupCnt -= 1
+ }
+ }
+ }
+}
+
+/**
+ * Emits a part of the results of a full outer join:
+ *
+ * - preserved right rows (null, right)
+ *
+ * Join result from matching join pairs (left + right) and preserved left rows (left + null) are
+ * emitted by LeftFullOuterJoinGroupReduceRunner.
+ */
+class RightFullOuterJoinGroupReduceRunner(
+ name: String,
+ code: String,
+ returnType: TypeInformation[Row])
+ extends OuterJoinGroupReduceRunner(name, code, returnType) {
+
+ override final def reduce(pairs: Iterable[Row], out: Collector[Row]): Unit = {
+
+ var needsNull = true
+ var right: Row = null
+ var rightDupCnt: Int = 0
+
+ val pairsIt = pairs.iterator()
+
+ // go over all joined pairs
+ while (pairsIt.hasNext && needsNull) {
+
+ val pair = pairsIt.next()
+ right = pair.getField(1).asInstanceOf[Row]
+ rightDupCnt = pair.getField(3).asInstanceOf[Int]
+ val left = pair.getField(0).asInstanceOf[Row]
+
+ if (left != null) {
+ // we have a joining left record. Do not emit a null-padded result record.
+ needsNull = false
+ // we do NOT emit join results here. This was done by LeftFullOuterJoinGroupReduceRunner.
+ }
+ }
+
+ // we did not find a single joining left record. Emit null-padded result records.
+ if (needsNull) {
+ val result = function.join(null, right)
+ // emit as many null-padded result records as the duplication count of the right record.
+ while (rightDupCnt > 0) {
+ out.collect(result)
+ rightDupCnt -= 1
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala
new file mode 100644
index 0000000..a9e0211
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/outerJoinRunners.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.functions.{JoinFunction, RichFlatJoinFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+abstract class OuterJoinRunner(
+ name: String,
+ code: String,
+ @transient var returnType: TypeInformation[Row])
+ extends RichFlatJoinFunction[Row, Row, Row]
+ with ResultTypeQueryable[Row]
+ with Compiler[JoinFunction[Row, Row, JBool]]
+ with Logging {
+
+ protected var function: JoinFunction[Row, Row, JBool] = null
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating FlatJoinFunction.")
+ function = clazz.newInstance()
+ }
+
+ override def getProducedType: TypeInformation[Row] = returnType
+}
+
+/**
+ * Emits left outer join pairs of left and right rows.
+ * Left rows are always preserved if no matching right row is found (predicate evaluates to false
+ * or right input row is null).
+ */
+class LeftOuterJoinRunner(
+ name: String,
+ code: String,
+ returnType: TypeInformation[Row])
+ extends OuterJoinRunner(name, code, returnType) {
+
+ val outRow = new Row(3)
+
+ override final def join(leftWithCnt: Row, right: Row, out: Collector[Row]): Unit = {
+
+ val left: Row = leftWithCnt.getField(0).asInstanceOf[Row]
+ val leftCnt = leftWithCnt.getField(1)
+
+ outRow.setField(0, left)
+ outRow.setField(2, leftCnt)
+
+ if (right == null) {
+ // right input row is null. Emit pair with null as right row
+ outRow.setField(1, null)
+ } else {
+ // evaluate predicate.
+ if (function.join(left, right)) {
+ // emit pair with right row
+ outRow.setField(1, right)
+ } else {
+ // emit pair with null as right row
+ outRow.setField(1, null)
+ }
+ }
+ out.collect(outRow)
+
+ }
+}
+
+/**
+ * Emits right outer join pairs of left and right rows.
+ * Right rows are always preserved if no matching left row is found (predicate evaluates to false
+ * or left input row is null).
+ */
+class RightOuterJoinRunner(
+ name: String,
+ code: String,
+ returnType: TypeInformation[Row])
+ extends OuterJoinRunner(name, code, returnType) {
+
+ val outRow = new Row(3)
+
+ override final def join(left: Row, rightWithCnt: Row, out: Collector[Row]): Unit = {
+
+ val right: Row = rightWithCnt.getField(0).asInstanceOf[Row]
+ val rightCnt = rightWithCnt.getField(1)
+
+ outRow.setField(1, right)
+ outRow.setField(2, rightCnt)
+
+ if (left == null) {
+ // left input row is null. Emit pair with null as left row
+ outRow.setField(0, null)
+ } else {
+ // evaluate predicate.
+ if (function.join(left, right)) {
+ outRow.setField(0, left)
+ } else {
+ outRow.setField(0, null)
+ }
+ }
+ out.collect(outRow)
+ }
+}
+
+/**
+ * Emits full outer join pairs of left and right rows.
+ * Left and right rows are always preserved if no matching right row is found (predicate evaluates
+ * to false or left or right input row is null).
+ */
+class FullOuterJoinRunner(
+ name: String,
+ code: String,
+ returnType: TypeInformation[Row])
+ extends OuterJoinRunner(name, code, returnType) {
+
+ val outRow = new Row(4)
+
+ override final def join(leftWithCnt: Row, rightWithCnt: Row, out: Collector[Row]): Unit = {
+
+ if (leftWithCnt == null) {
+ // left row is null. Emit join pair with null as left row.
+ val right: Row = rightWithCnt.getField(0).asInstanceOf[Row]
+ val rightCnt = rightWithCnt.getField(1)
+
+ outRow.setField(0, null)
+ outRow.setField(1, right)
+ outRow.setField(2, null)
+ outRow.setField(3, rightCnt)
+ out.collect(outRow)
+ } else if (rightWithCnt == null) {
+ // right row is null. Emit join pair with null as right row.
+ val left: Row = leftWithCnt.getField(0).asInstanceOf[Row]
+ val leftCnt = leftWithCnt.getField(1)
+
+ outRow.setField(0, left)
+ outRow.setField(1, null)
+ outRow.setField(2, leftCnt)
+ outRow.setField(3, null)
+ out.collect(outRow)
+ } else {
+ // both input rows are not null. Evaluate predicate.
+ val left: Row = leftWithCnt.getField(0).asInstanceOf[Row]
+ val leftCnt = leftWithCnt.getField(1)
+ val right: Row = rightWithCnt.getField(0).asInstanceOf[Row]
+ val rightCnt = rightWithCnt.getField(1)
+
+ if (function.join(left, right)) {
+ // predicate was true. Set rows in join pair
+ outRow.setField(0, left)
+ outRow.setField(1, right)
+ outRow.setField(2, leftCnt)
+ outRow.setField(3, rightCnt)
+ out.collect(outRow)
+ } else {
+ // predicate was false. Emit two join pairs to preserve both input rows.
+ // emit pair with left row and null as right row
+ outRow.setField(0, left)
+ outRow.setField(1, null)
+ outRow.setField(2, leftCnt)
+ outRow.setField(3, null)
+ out.collect(outRow)
+
+ // emit pair with right row and null as left row
+ outRow.setField(0, null)
+ outRow.setField(1, right)
+ outRow.setField(2, null)
+ outRow.setField(3, rightCnt)
+ out.collect(outRow)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala
new file mode 100644
index 0000000..a3a597f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/JoinTest.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+
+ @Test
+ def testLeftOuterJoinEquiPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "=(a, z)"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testLeftOuterJoinEquiAndLocalPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z AND b < 2"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "<(b, 2) AS $f3")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "AND(=(a, z), $f3)"),
+ term("join", "a", "b", "$f3", "y", "z"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testLeftOuterJoinEquiAndNonEquiPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t LEFT OUTER JOIN s ON a = z AND b < x"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ batchTableNode(1),
+ term("where", "AND(=(a, z), <(b, x))"),
+ term("join", "a", "b", "x", "y", "z"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testRightOuterJoinEquiPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "=(a, z)"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testRightOuterJoinEquiAndLocalPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, x FROM t RIGHT OUTER JOIN s ON a = z AND x < 2"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "x", "z", "<(x, 2) AS $f3")
+ ),
+ term("where", "AND(=(a, z), $f3)"),
+ term("join", "a", "b", "x", "z", "$f3"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "b", "x")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testRightOuterJoinEquiAndNonEquiPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t RIGHT OUTER JOIN s ON a = z AND b < x"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ batchTableNode(1),
+ term("where", "AND(=(a, z), <(b, x))"),
+ term("join", "a", "b", "x", "y", "z"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testFullOuterJoinEquiPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t FULL OUTER JOIN s ON a = z"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "=(a, z)"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testFullOuterJoinEquiAndLocalPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t FULL OUTER JOIN s ON a = z AND b < 2 AND z > 5"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "<(b, 2) AS $f3")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z", ">(z, 5) AS $f3")
+ ),
+ term("where", "AND(=(a, z), $f3, $f30)"),
+ term("join", "a", "b", "$f3", "y", "z", "$f30"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testFullOuterJoinEquiAndNonEquiPred(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("t", 'a, 'b, 'c)
+ util.addTable[(Long, String, Int)]("s", 'x, 'y, 'z)
+
+ val query = "SELECT b, y FROM t FULL OUTER JOIN s ON a = z AND b < x"
+ val result = util.tableEnv.sqlQuery(query)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ batchTableNode(1),
+ term("where", "AND(=(a, z), <(b, x))"),
+ term("join", "a", "b", "x", "y", "z"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
index d9e0e10..628bf5f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -83,67 +83,34 @@ class JoinValidationTest extends TableTestBase {
}
@Test(expected = classOf[TableException])
- def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+ def testRightOuterJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
- val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b < e"
util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
- def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+ def testLeftOuterJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
- val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
+ val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b > e"
util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
@Test(expected = classOf[TableException])
- def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+ def testFullOuterJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
- val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
-
- util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
- }
-
- @Test(expected = classOf[TableException])
- def testRightOuterJoinWithLocalPredicate(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
-
- util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
- }
-
- @Test(expected = classOf[TableException])
- def testLeftOuterJoinWithLocalPredicate(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
-
- util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
- }
-
- @Test(expected = classOf[TableException])
- def testFullOuterJoinWithLocalPredicate(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b <> e"
util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
new file mode 100644
index 0000000..9ee7fc2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class JoinTest extends TableTestBase {
+
+ @Test
+ def testLeftOuterJoinEquiPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.leftOuterJoin(s, 'a === 'z).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "=(a, z)"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testLeftOuterJoinEquiAndLocalPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.leftOuterJoin(s, 'a === 'z && 'b < 2).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "AND(=(a, z), <(b, 2))"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testLeftOuterJoinEquiAndNonEquiPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.leftOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ batchTableNode(1),
+ term("where", "AND(=(a, z), <(b, x))"),
+ term("join", "a", "b", "x", "y", "z"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testRightOuterJoinEquiPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.rightOuterJoin(s, 'a === 'z).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "=(a, z)"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testRightOuterJoinEquiAndLocalPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.rightOuterJoin(s, 'a === 'z && 'x < 2).select('b, 'x)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "x", "z")
+ ),
+ term("where", "AND(=(a, z), <(x, 2))"),
+ term("join", "a", "b", "x", "z"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "b", "x")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testRightOuterJoinEquiAndNonEquiPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.rightOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ batchTableNode(1),
+ term("where", "AND(=(a, z), <(b, x))"),
+ term("join", "a", "b", "x", "y", "z"),
+ term("joinType", "RightOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testFullOuterJoinEquiPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.fullOuterJoin(s, 'a === 'z).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "=(a, z)"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testFullOuterJoinEquiAndLocalPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.fullOuterJoin(s, 'a === 'z && 'b < 2).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "y", "z")
+ ),
+ term("where", "AND(=(a, z), <(b, 2))"),
+ term("join", "a", "b", "y", "z"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+
+ @Test
+ def testFullOuterJoinEquiAndNonEquiPred(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Int, Long, String)]("T", 'a, 'b, 'c)
+ val s = util.addTable[(Long, String, Int)]("S", 'x, 'y, 'z)
+
+ val joined = t.fullOuterJoin(s, 'a === 'z && 'b < 'x).select('b, 'y)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b")
+ ),
+ batchTableNode(1),
+ term("where", "AND(=(a, z), <(b, x))"),
+ term("join", "a", "b", "x", "y", "z"),
+ term("joinType", "FullOuterJoin")
+ ),
+ term("select", "b", "y")
+ )
+
+ util.verifyTable(joined, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
index 3cc278b..e2ecd38 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/JoinValidationTest.scala
@@ -92,16 +92,7 @@ class JoinValidationTest extends TableTestBase {
}
@Test(expected = classOf[ValidationException])
- def testNoJoinCondition(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testNoEquiJoin(): Unit = {
+ def testLeftJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
@@ -110,57 +101,21 @@ class JoinValidationTest extends TableTestBase {
}
@Test(expected = classOf[ValidationException])
- def testRightJoinWithNonEquiJoinPredicate(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testLeftJoinWithLocalPredicate(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testFullJoinWithLocalPredicate(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testRightJoinWithLocalPredicate(): Unit = {
- val util = batchTestUtil()
- val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
- val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
- ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g)
- }
-
- @Test(expected = classOf[ValidationException])
- def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
+ def testRightJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
- ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+ ds2.rightOuterJoin(ds1, 'b < 'd).select('c, 'g)
}
@Test(expected = classOf[ValidationException])
- def testFullJoinWithNonEquiJoinPredicate(): Unit = {
+ def testFullJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
- ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+ ds2.fullOuterJoin(ds1, 'b < 'd).select('c, 'g)
}
@Test(expected = classOf[ValidationException])
http://git-wip-us.apache.org/repos/asf/flink/blob/2a638155/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
index ddf622c..20348c4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
@@ -18,16 +18,21 @@
package org.apache.flink.table.runtime.batch.table
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.MapPartitionFunction
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.utils.TableFunc2
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -36,8 +41,9 @@ import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class JoinITCase(
+ execMode: TestExecutionMode,
configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
+ extends TableProgramsClusterTestBase(execMode, configMode) {
@Test
def testJoin(): Unit = {
@@ -106,8 +112,10 @@ class JoinITCase(
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
@@ -214,8 +222,10 @@ class JoinITCase(
val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.getConfig.setNullCheck(true)
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
@@ -225,38 +235,144 @@ class JoinITCase(
"Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
"Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
"Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
- "Comment#15,null\n"
+ "Comment#15,null\n" +
+ "NullTuple,null\n" + "NullTuple,null\n"
val results = joinT.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
+ def testLeftJoinWithNonEquiJoinPred(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+ val expected = Seq(
+ "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC",
+ "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+ "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null",
+ "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null",
+ "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+ "Comment#13,null", "Comment#14,null", "Comment#15,null",
+ "NullTuple,null", "NullTuple,null")
+ val results = joinT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+ }
+
+ @Test
+ def testLeftJoinWithLeftLocalPred(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 2).select('c, 'g)
+
+ val expected = Seq(
+ "Hello,Hallo Welt", "Hello,Hallo Welt wie",
+ "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "Hello world,BCD",
+ "Hi,null", "Hello world, how are you?,null", "I am fine.,null", "Luke Skywalker,null",
+ "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null",
+ "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+ "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null",
+ "NullTuple,null", "NullTuple,null")
+ val results = joinT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+ }
+
+ @Test
def testRightJoinWithMultipleKeys(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.getConfig.setNullCheck(true)
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
"Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
"null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
- "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n"
+ "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n" +
+ "null,NullTuple\n" + "null,NullTuple\n"
val results = joinT.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
+ def testRightJoinWithNonEquiJoinPred(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val ds2 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+
+ val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+ val expected = Seq(
+ "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC",
+ "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+ "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null",
+ "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null",
+ "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+ "Comment#13,null", "Comment#14,null", "Comment#15,null",
+ "NullTuple,null", "NullTuple,null")
+ val results = joinT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+ }
+
+ @Test
+ def testRightJoinWithLeftLocalPred(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val ds2 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+
+ val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 2).select('c, 'g)
+
+ val expected = Seq(
+ "Hello,Hallo Welt", "Hello,Hallo Welt wie",
+ "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", "Hello world,BCD",
+ "Hi,null", "Hello world, how are you?,null", "I am fine.,null", "Luke Skywalker,null",
+ "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null",
+ "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+ "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null",
+ "NullTuple,null", "NullTuple,null")
+ val results = joinT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+ }
+
+ @Test
def testFullOuterJoinWithMultipleKeys(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.getConfig.setNullCheck(true)
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
@@ -268,12 +384,74 @@ class JoinITCase(
"Comment#5,null\n" + "Comment#6,null\n" + "Comment#7,null\n" + "Comment#8,null\n" +
"Comment#9,null\n" + "Comment#10,null\n" + "Comment#11,null\n" + "Comment#12,null\n" +
"Comment#13,null\n" + "Comment#14,null\n" + "Comment#15,null\n" +
- "Hello world, how are you?,null\n"
+ "Hello world, how are you?,null\n" +
+ "NullTuple,null\n" + "NullTuple,null\n" + "null,NullTuple\n" + "null,NullTuple\n"
val results = joinT.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
+ def testFullJoinWithNonEquiJoinPred(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b <= 'h).select('c, 'g)
+
+ val expected = Seq(
+ // join matches
+ "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC",
+ "Hello world,BCD", "I am fine.,HIJ", "I am fine.,IJK",
+ // preserved left
+ "Hello world, how are you?,null", "Luke Skywalker,null", "Comment#1,null", "Comment#2,null",
+ "Comment#3,null", "Comment#4,null", "Comment#5,null", "Comment#6,null", "Comment#7,null",
+ "Comment#8,null", "Comment#9,null", "Comment#10,null", "Comment#11,null", "Comment#12,null",
+ "Comment#13,null", "Comment#14,null", "Comment#15,null", "NullTuple,null", "NullTuple,null",
+ // preserved right
+ "null,Hallo Welt wie", "null,CDE", "null,DEF", "null,EFG", "null,FGH", "null,GHI", "null,JKL",
+ "null,KLM", "null,NullTuple", "null,NullTuple")
+ val results = joinT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+ }
+
+ @Test
+ def testFullJoinWithLeftLocalPred(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = addNullKey3Tuples(
+ CollectionDataSets.get3TupleDataSet(env)).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = addNullKey5Tuples(
+ CollectionDataSets.get5TupleDataSet(env)).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b >= 2 && 'h === 1).select('c, 'g)
+
+ val expected = Seq(
+ // join matches
+ "Hello,Hallo Welt wie", "Hello world, how are you?,DEF", "Hello world, how are you?,EFG",
+ "I am fine.,GHI",
+ // preserved left
+ "Hi,null", "Hello world,null", "Luke Skywalker,null",
+ "Comment#1,null", "Comment#2,null", "Comment#3,null", "Comment#4,null", "Comment#5,null",
+ "Comment#6,null", "Comment#7,null", "Comment#8,null", "Comment#9,null", "Comment#10,null",
+ "Comment#11,null", "Comment#12,null", "Comment#13,null", "Comment#14,null", "Comment#15,null",
+ "NullTuple,null", "NullTuple,null",
+ // preserved right
+ "null,Hallo", "null,Hallo Welt", "null,Hallo Welt wie gehts?", "null,ABC", "null,BCD",
+ "null,CDE", "null,FGH", "null,HIJ", "null,IJK", "null,JKL", "null,KLM",
+ "null,NullTuple", "null,NullTuple")
+
+ val results = joinT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected.mkString("\n"))
+ }
+
+ @Test
def testUDTFJoinOnTuples(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -295,4 +473,40 @@ class JoinITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ private def addNullKey3Tuples(rows: DataSet[(Int, Long, String)]) = {
+ rows.mapPartition(
+ new MapPartitionFunction[(Int, Long, String), (Integer, Long, String)] {
+
+ override def mapPartition(
+ vals: Iterable[(Int, Long, String)],
+ out: Collector[(Integer, Long, String)]): Unit = {
+ val it = vals.iterator()
+ while (it.hasNext) {
+ val v = it.next()
+ out.collect((int2Integer(v._1), v._2, v._3))
+ }
+ out.collect((null.asInstanceOf[Integer], 999L, "NullTuple"))
+ out.collect((null.asInstanceOf[Integer], 999L, "NullTuple"))
+ }
+ })
+ }
+
+ private def addNullKey5Tuples(rows: DataSet[(Int, Long, Int, String, Long)]) = {
+ rows.mapPartition(
+ new MapPartitionFunction[(Int, Long, Int, String, Long), (Integer, Long, Int, String, Long)] {
+
+ override def mapPartition(
+ vals: Iterable[(Int, Long, Int, String, Long)],
+ out: Collector[(Integer, Long, Int, String, Long)]): Unit = {
+ val it = vals.iterator()
+ while (it.hasNext) {
+ val v = it.next()
+ out.collect((int2Integer(v._1), v._2, v._3, v._4, v._5))
+ }
+ out.collect((null.asInstanceOf[Integer], 999L, 999, "NullTuple", 999L))
+ out.collect((null.asInstanceOf[Integer], 999L, 999, "NullTuple", 999L))
+ }
+ })
+ }
+
}
[2/3] flink git commit: [hotfix] [docs] Synchronize the Table API and
SQL documentation of time-windowed joins.
Posted by fh...@apache.org.
[hotfix] [docs] Synchronize the Table API and SQL documentation of time-windowed joins.
This closes #4874.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7da886a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7da886a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7da886a6
Branch: refs/heads/master
Commit: 7da886a666ab2dcfcf4c0c69a04cd6ee34f7e4fc
Parents: 2a63815
Author: Xingcan Cui <xi...@gmail.com>
Authored: Fri Oct 20 22:52:52 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 24 20:28:46 2017 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 10 +++++-----
docs/dev/table/tableApi.md | 8 ++++----
2 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7da886a6/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 8a86f72..0eef48b 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -400,15 +400,15 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires a special join condition that bounds the time on both sides. This can be done by either two appropriate range predicates (<code> <, <=, >=, ></code>) or a <code>BETWEEN</code> predicate that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+ <p>A time-windowed join requires at least one equi-join predicate and a special join
+ condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
<ul>
- <li>Time predicates must compare time attributes of both input tables.</li>
- <li>Time predicates must compare only time attributes of the same type, i.e., processing time with processing time or event time with event time.</li>
- <li>Only range predicates are valid time predicates.</li>
+ <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
+ <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
</ul>
</p>
- <p><b>Note:</b> Currently, only <code>INNER</code> joins are supported.</p>
+ <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
{% highlight sql %}
SELECT *
http://git-wip-us.apache.org/repos/asf/flink/blob/7da886a6/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 9380025..07b301d 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -527,9 +527,9 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> <, <=, >=, ></code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+ <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
<ul>
- <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+ <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
<li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
</ul>
</p>
@@ -644,9 +644,9 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
<td>
<p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
- <p>A time-windowed join requires an equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code> <, <=, >=, ></code>) that compare the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
+ <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code><, <=, >=, ></code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
<ul>
- <li>The time attribute of a stream must be compared to a bounded interval on a time attribute of the opposite stream.</li>
+ <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
<li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
</ul>
</p>
[3/3] flink git commit: [FLINK-7821] [table] Deprecate Table.limit()
and replace it by Table.offset() and Table.fetch().
Posted by fh...@apache.org.
[FLINK-7821] [table] Deprecate Table.limit() and replace it by Table.offset() and Table.fetch().
This closes #4813.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1c7e490
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1c7e490
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1c7e490
Branch: refs/heads/master
Commit: e1c7e490395c44913e816966b2fa3b8bfeb18160
Parents: 7da886a
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 12 10:56:19 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 24 20:28:46 2017 +0200
----------------------------------------------------------------------
docs/dev/table/tableApi.md | 42 +++++++------
.../org/apache/flink/table/api/table.scala | 66 +++++++++++++++++++-
.../table/validation/SortValidationTest.scala | 34 +++++++++-
.../validation/CorrelateValidationTest.scala | 4 +-
.../UnsupportedOpsValidationTest.scala | 12 +++-
.../table/runtime/batch/table/SortITCase.scala | 6 +-
6 files changed, 135 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 07b301d..f0c4605 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -959,7 +959,7 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
{% top %}
-### OrderBy & Limit
+### OrderBy, Offset & Fetch
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -987,19 +987,22 @@ Table result = in.orderBy("a.asc");
<tr>
<td>
- <strong>Limit</strong><br>
+ <strong>Offset & Fetch</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+
+// returns the first 5 records from the sorted result
+Table result1 = in.orderBy("a.asc").fetch(5);
+
+// skips the first 3 records and returns all following records from the sorted result
+Table result2 = in.orderBy("a.asc").offset(3);
+
+// skips the first 10 records and returns the next 5 records from the sorted result
+Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
{% endhighlight %}
</td>
</tr>
@@ -1033,19 +1036,22 @@ val result = in.orderBy('a.asc);
<tr>
<td>
- <strong>Limit</strong><br>
+ <strong>Offset & Fetch</strong><br>
<span class="label label-primary">Batch</span>
</td>
<td>
- <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+ <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
{% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight scala %}
-val in = ds.toTable(tableEnv, 'a, 'b, 'c);
-val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with the 4th record
+val in = ds.toTable(tableEnv, 'a, 'b, 'c)
+
+// returns the first 5 records from the sorted result
+val result1: Table = in.orderBy('a.asc).fetch(5)
+
+// skips the first 3 records and returns all following records from the sorted result
+val result2: Table = in.orderBy('a.asc).offset(3)
+
+// skips the first 10 records and returns the next 5 records from the sorted result
+val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)
{% endhighlight %}
</td>
</tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 30ed98e..0430e49 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -25,7 +25,6 @@ import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionPar
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
import org.apache.flink.table.plan.logical.{Minus, _}
-import org.apache.flink.table.plan.schema.TableSinkTable
import org.apache.flink.table.sinks.TableSink
import _root_.scala.annotation.varargs
@@ -721,12 +720,16 @@ class Table(
* Example:
*
* {{{
- * // returns unlimited number of records beginning with the 4th record
+ * // skips the first 3 rows and returns all following rows.
* tab.orderBy('name.desc).limit(3)
* }}}
*
* @param offset number of records to skip
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @Deprecated
+ @deprecated(message = "Deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int): Table = {
new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
}
@@ -739,18 +742,75 @@ class Table(
* Example:
*
* {{{
- * // returns 5 records beginning with the 4th record
+ * // skips the first 3 rows and returns the next 5 rows.
* tab.orderBy('name.desc).limit(3, 5)
* }}}
*
* @param offset number of records to skip
* @param fetch number of records to be returned
+ *
+ * @deprecated Please use [[Table.offset()]] and [[Table.fetch()]] instead.
*/
+ @Deprecated
+ @deprecated(message = "deprecated in favor of Table.offset() and Table.fetch()", since = "1.4.0")
def limit(offset: Int, fetch: Int): Table = {
new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
}
/**
+ * Limits a sorted result from an offset position.
+ * Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.offset(o)]] can be combined with a subsequent [[Table.fetch(n)]] call to return n rows
+ * after skipping the first o rows.
+ *
+ * {{{
+ * // skips the first 3 rows and returns all following rows.
+ * tab.orderBy('name.desc).offset(3)
+ * // skips the first 10 rows and returns the next 5 rows.
+ * tab.orderBy('name.desc).offset(10).fetch(5)
+ * }}}
+ *
+ * @param offset number of records to skip
+ */
+ def offset(offset: Int): Table = {
+ new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+ }
+
+ /**
+ * Limits a sorted result to the first n rows.
+ * Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and
+ * thus must be preceded by it.
+ *
+ * [[Table.fetch(n)]] can be combined with a preceding [[Table.offset(o)]] call to return n rows
+ * after skipping the first o rows.
+ *
+ * {{{
+ * // returns the first 3 records.
+ * tab.orderBy('name.desc).fetch(3)
+ * // skips the first 10 rows and returns the next 5 rows.
+ * tab.orderBy('name.desc).offset(10).fetch(5)
+ * }}}
+ *
+ * @param fetch the number of records to return. Fetch must be >= 0.
+ */
+ def fetch(fetch: Int): Table = {
+ if (fetch < 0) {
+ throw ValidationException("FETCH count must be equal or larger than 0.")
+ }
+ this.logicalPlan match {
+ case Limit(o, -1, c) =>
+ // replace LIMIT without FETCH by LIMIT with FETCH
+ new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))
+ case Limit(_, _, _) =>
+ throw ValidationException("FETCH is already defined.")
+ case _ =>
+ new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))
+ }
+ }
+
+ /**
* Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
*
* A batch [[Table]] can only be written to a
http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
index 5064687..24156e4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/SortValidationTest.scala
@@ -27,10 +27,42 @@ import org.junit._
class SortValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
+ def testOffsetWithoutOrder(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ ds.offset(5)
+ }
+
+ @Test(expected = classOf[ValidationException])
def testFetchWithoutOrder(): Unit = {
val util = batchTestUtil()
val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
- ds.limit(0, 5)
+ ds.fetch(5)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testFetchBeforeOffset(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ ds.orderBy('a.asc).fetch(5).offset(10)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testOffsetBeforeOffset(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ ds.orderBy('a.asc).offset(10).offset(5)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testNegativeFetch(): Unit = {
+ val util = batchTestUtil()
+ val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+ ds.orderBy('a.asc).offset(-1)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index c754afd..8f429e1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -86,13 +86,13 @@ class CorrelateValidationTest extends TableTestBase {
// table function call limit
expectExceptionThrown(
- func1('c).orderBy('f0).limit(3),
+ func1('c).orderBy('f0).offset(3),
"TableFunction can only be used in join and leftOuterJoin."
)
// table function call limit
expectExceptionThrown(
- func1('c).orderBy('f0).limit(0, 3),
+ func1('c).orderBy('f0).fetch(3),
"TableFunction can only be used in join and leftOuterJoin."
)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
index de7f70a..c1ad08c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/UnsupportedOpsValidationTest.scala
@@ -89,11 +89,19 @@ class UnsupportedOpsValidationTest extends StreamingMultipleProgramsTestBase {
}
@Test(expected = classOf[ValidationException])
- def testLimit(): Unit = {
+ def testOffset(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.limit(0,5)
+ t1.offset(5)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testFetch(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.fetch(5)
}
@Test(expected = classOf[ValidationException])
http://git-wip-us.apache.org/repos/asf/flink/blob/e1c7e490/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
index 568fad0..79bb711 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SortITCase.scala
@@ -143,7 +143,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
+ val t = ds.toTable(tEnv).orderBy('_1.asc).offset(3)
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )
@@ -171,7 +171,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
+ val t = ds.toTable(tEnv).orderBy('_1.desc).offset(3).fetch(5)
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
@@ -199,7 +199,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val ds = CollectionDataSets.get3TupleDataSet(env)
- val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
+ val t = ds.toTable(tEnv).orderBy('_1.asc).fetch(5)
implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )