You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/01/19 22:52:33 UTC
[1/3] flink git commit: [FLINK-5520] [table] Disable outer joins with
non-equality predicates.
Repository: flink
Updated Branches:
refs/heads/master 4833e74e7 -> d1301c82b
[FLINK-5520] [table] Disable outer joins with non-equality predicates.
This closes #3141.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1301c82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1301c82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1301c82
Branch: refs/heads/master
Commit: d1301c82b85c00284d90e8f5bdac4fd86dc5b173
Parents: 0ea996a
Author: lincoln-lil <li...@gmail.com>
Authored: Tue Jan 17 22:42:39 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jan 19 23:52:00 2017 +0100
----------------------------------------------------------------------
.../flink/table/plan/logical/operators.scala | 52 ++++++---
.../table/plan/nodes/dataset/DataSetJoin.scala | 3 +-
.../plan/rules/dataSet/DataSetJoinRule.scala | 7 +-
.../table/api/scala/batch/sql/JoinITCase.scala | 107 ++++++++++++++++++-
.../api/scala/batch/table/JoinITCase.scala | 89 +++++++++++++--
.../stringexpr/JoinStringExpressionTest.scala | 18 ----
6 files changed, 228 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 743bdfe..3ba0285 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -442,7 +442,7 @@ case class Join(
val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
- failValidation(s"Filter operator requires a boolean expression as input, " +
+ failValidation(s"Filter operator requires a boolean expression as input, " +
s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}")
} else if (ambiguousName.nonEmpty) {
failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
@@ -454,30 +454,54 @@ case class Join(
private def testJoinCondition(expression: Expression): Unit = {
- def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
- case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
- if x.isFromLeftInput != y.isFromLeftInput => Unit
- case x => failValidation(
- s"Invalid non-join predicate $exp. For non-join predicates use Table#where.")
- }
+ def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
+ case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
+ if x.isFromLeftInput != y.isFromLeftInput => true
+ case _ => false
+ }
+
+ def checkIfFilterCondition(exp: BinaryComparison) = exp.children match {
+ case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false
+ case (x: JoinFieldReference) :: (_) :: Nil => true
+ case (_) :: (y: JoinFieldReference) :: Nil => true
+ case _ => false
+ }
+
+ var equiJoinPredicateFound = false
+ var nonEquiJoinPredicateFound = false
+ var localPredicateFound = false
- var equiJoinFound = false
def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
case x: And => x.children.foreach(validateConditions(_, isAndBranch))
case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
case x: EqualTo =>
- if (isAndBranch) {
- equiJoinFound = true
+ if (isAndBranch && checkIfJoinCondition(x)) {
+ equiJoinPredicateFound = true
+ }
+ if (checkIfFilterCondition(x)) {
+ localPredicateFound = true
}
- checkIfJoinCondition(x)
- case x: BinaryComparison => checkIfJoinCondition(x)
+ case x: BinaryComparison => {
+ if (checkIfFilterCondition(x)) {
+ localPredicateFound = true
+ } else {
+ nonEquiJoinPredicateFound = true
+ }
+ }
case x => failValidation(
s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
}
validateConditions(expression, isAndBranch = true)
- if (!equiJoinFound) {
- failValidation(s"Invalid join condition: $expression. At least one equi-join required.")
+ if (!equiJoinPredicateFound) {
+ failValidation(
+ s"Invalid join condition: $expression. At least one equi-join predicate is " +
+ s"required.")
+ }
+ if (joinType != JoinType.INNER && (nonEquiJoinPredicateFound || localPredicateFound)) {
+ failValidation(
+ s"Invalid join condition: $expression. Non-equality join predicates or local" +
+ s" predicates are not supported in outer joins.")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index 324e949..edb5be2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -31,7 +31,8 @@ import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.FlatJoinRunner
import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.RexNode
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import scala.collection.JavaConversions._
http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
index 2874198..3f49c6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.rules.dataSet
import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.logical.LogicalJoin
-
-import org.apache.flink.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin}
import scala.collection.JavaConversions._
@@ -40,7 +40,8 @@ class DataSetJoinRule
val joinInfo = join.analyzeCondition
// joins require an equi-condition or a conjunctive predicate with at least one equi-condition
- !joinInfo.pairs().isEmpty
+ // and disable outer joins with non-equality predicates(see FLINK-5520)
+ !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
}
override def convert(rel: RelNode): RelNode = {
http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index d07c282..9df17ad 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -78,7 +78,7 @@ class JoinITCase(
}
@Test
- def testJoinWithJoinFilter(): Unit = {
+ def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -92,8 +92,7 @@ class JoinITCase(
val result = tEnv.sql(sqlQuery)
- val expected = "Hello world, how are you?,Hallo Welt wie\n" +
- "I am fine.,Hallo Welt wie\n"
+ val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -375,4 +374,106 @@ class JoinITCase(
Assert.assertEquals(0, result)
}
+
+ @Test(expected = classOf[TableException])
+ def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[TableException])
+ def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[TableException])
+ def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRightOuterJoinWithLocalPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[TableException])
+ def testLeftOuterJoinWithLocalPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[TableException])
+ def testFullOuterJoinWithLocalPredicate(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row].collect()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index 3305949..5993728 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -78,11 +78,26 @@ class JoinITCase(
val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ val joinT = ds1.join(ds2).where('b === 'e && 'a < 6).select('c, 'g)
+
+ val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
+ "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
+ val results = joinT.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
- val expected = "Hello world, how are you?,Hallo Welt wie\n" +
- "I am fine.,Hallo Welt wie\n"
val results = joinT.toDataSet[Row].collect()
+ val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -97,7 +112,7 @@ class JoinITCase(
val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
- "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+ "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
val results = joinT.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -234,8 +249,8 @@ class JoinITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- @Test
- def testRightJoinWithNotOnlyEquiJoin(): Unit = {
+ @Test(expected = classOf[ValidationException])
+ def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
tEnv.getConfig.setNullCheck(true)
@@ -243,11 +258,67 @@ class JoinITCase(
val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
- val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+ ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
+ }
- val expected = "Hello world,BCD\n"
- val results = joinT.toDataSet[Row].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
+ @Test(expected = classOf[ValidationException])
+ def testFullJoinWithNonEquiJoinPredicate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testRightJoinWithNonEquiJoinPredicate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testLeftJoinWithLocalPredicate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testFullJoinWithLocalPredicate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testRightJoinWithLocalPredicate(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ tEnv.getConfig.setNullCheck(true)
+
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+ ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/d1301c82/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
index 025cda9..b2f683c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
@@ -238,24 +238,6 @@ class JoinStringExpressionTest {
}
@Test
- def testRightJoinWithNotOnlyEquiJoin(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- tEnv.getConfig.setNullCheck(true)
-
- val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
- val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
- val t2 = ds1.rightOuterJoin(ds2, "a = d && b < h").select("c, g")
-
- val lPlan1 = t1.logicalPlan
- val lPlan2 = t2.logicalPlan
-
- Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
- }
-
- @Test
def testFullOuterJoinWithMultipleKeys(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
[2/3] flink git commit: [FLINK-5452] [table] Fix SortITCase which
fails under cluster mode.
Posted by fh...@apache.org.
[FLINK-5452] [table] Fix SortITCase which fails under cluster mode.
This closes #3095.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea996a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea996a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea996a6
Branch: refs/heads/master
Commit: 0ea996a64ca0ff9589ffcd5b89967f51aee1ffe6
Parents: 0c6e0ee
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Jan 19 16:31:24 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jan 19 23:52:00 2017 +0100
----------------------------------------------------------------------
.../table/api/scala/batch/sql/SortITCase.scala | 81 +++++++++++-------
.../api/scala/batch/table/SortITCase.scala | 88 ++++++++++++--------
2 files changed, 107 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea996a6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
index 43847dc..c577797 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SortITCase.scala
@@ -18,16 +18,15 @@
package org.apache.flink.table.api.scala.batch.sql
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.scala._
-import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -35,77 +34,95 @@ import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
-class SortITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
+class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
+
+ private def getExecutionEnvironment = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ // set the parallelism explicitly to make sure the query is executed in
+ // a distributed manner
+ env.setParallelism(3)
+ env
+ }
@Test
def testOrderByMultipleFieldsWithSql(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
+ val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
(- x.productElement(0).asInstanceOf[Int], - x.productElement(1).asInstanceOf[Long]))
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
val expected = sortExpectedly(tupleDataSetStrings)
+ // squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ def rowOrdering = Ordering.by((r : Row) => {
+ // ordering for this tuple will fall into the previous defined tupleOrdering,
+ // so we just need to return the field by their defining sequence
+ (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+ })
+
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(rowOrdering)
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@Test
def testOrderByWithOffset(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
+ val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
+ // squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
val result = results.
- filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@Test
def testOrderByWithOffsetAndFetch(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
+ val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 5 ROWS ONLY"
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
+ // squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
val result = results
.filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
.reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
@@ -113,31 +130,39 @@ class SortITCase(
@Test
def testOrderByLimit(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
+ val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable ORDER BY _2, _1 LIMIT 5"
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
(x.productElement(1).asInstanceOf[Long], x.productElement(0).asInstanceOf[Int]) )
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+ // squash all rows inside a partition into one element
val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ def rowOrdering = Ordering.by((r : Row) => {
+ // ordering for this tuple will fall into the previous defined tupleOrdering,
+ // so we just need to return the field by their defining sequence
+ (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+ })
+
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(rowOrdering)
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@Test(expected = classOf[TableException])
def testLimitWithoutOrder(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
+ val env = getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
http://git-wip-us.apache.org/repos/asf/flink/blob/0ea996a6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index 6fe7624..a84d8a9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -18,15 +18,15 @@
package org.apache.flink.table.api.scala.batch.table
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -34,13 +34,13 @@ import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
-class SortITCase(
- configMode: TableConfigMode)
- extends TableProgramsCollectionTestBase(configMode) {
+class SortITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) {
- def getExecutionEnvironment = {
+ private def getExecutionEnvironment = {
val env = ExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(4)
+ // set the parallelism explicitly to make sure the query is executed in
+ // a distributed manner
+ env.setParallelism(3)
env
}
@@ -51,16 +51,18 @@ class SortITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.desc)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
val expected = sortExpectedly(tupleDataSetStrings)
+ // squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@@ -72,16 +74,18 @@ class SortITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.asc)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )
val expected = sortExpectedly(tupleDataSetStrings)
+ // squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@@ -93,16 +97,24 @@ class SortITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
(x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
val expected = sortExpectedly(tupleDataSetStrings)
+ // squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ def rowOrdering = Ordering.by((r : Row) => {
+ // ordering for this tuple will fall into the previous defined tupleOrdering,
+ // so we just need to return the field by their defining sequence
+ (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
+ })
+
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(rowOrdering)
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@@ -114,16 +126,18 @@ class SortITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )
val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
+ // squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@@ -135,16 +149,18 @@ class SortITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
- x.productElement(0).asInstanceOf[Int] )
val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
+ // squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
@@ -156,16 +172,20 @@ class SortITCase(
val ds = CollectionDataSets.get3TupleDataSet(env)
val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
- implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+ implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
x.productElement(0).asInstanceOf[Int] )
val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+ // squash all rows inside a partition into one element
val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
+ implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])
+
val result = results
- .filterNot(_.isEmpty)
- .sortBy(_.head)(Ordering.by(f=> f.toString))
- .reduceLeft(_ ++ _)
+ .filterNot(_.isEmpty)
+ // sort all partitions by their head element to verify the order across partitions
+ .sortBy(_.head)
+ .reduceLeft(_ ++ _)
TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
}
[3/3] flink git commit: [hotfix] [docs] Add closing parentheses on
"DataStream API Programming Guide".
Posted by fh...@apache.org.
[hotfix] [docs] Add closing parentheses on "DataStream API Programming Guide".
This closes #3153.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c6e0ee9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c6e0ee9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c6e0ee9
Branch: refs/heads/master
Commit: 0c6e0ee9c15628dcdf76fd7ce4dae4614f589ce4
Parents: 4833e74
Author: Keiji Yoshida <ke...@gmail.com>
Authored: Wed Jan 18 21:01:36 2017 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jan 19 23:52:00 2017 +0100
----------------------------------------------------------------------
docs/dev/datastream_api.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c6e0ee9/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 850d8c5..4fc79dd 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -348,7 +348,7 @@ windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
-};
+});
{% endhighlight %}
</td>
</tr>
@@ -363,7 +363,7 @@ windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
-};
+});
{% endhighlight %}
</td>
</tr>