You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/12 11:25:40 UTC
flink git commit: [FLINK-3731] make embedded SQL outer joins fail
during translation
Repository: flink
Updated Branches:
refs/heads/master 2e63d1afb -> 24817c8cd
[FLINK-3731] make embedded SQL outer joins fail during translation
This closes #1869
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24817c8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24817c8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24817c8c
Branch: refs/heads/master
Commit: 24817c8cd26df18e0d0b9d313d24aa528e7e2379
Parents: 2e63d1a
Author: vasia <va...@apache.org>
Authored: Mon Apr 11 19:43:59 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Apr 12 10:29:41 2016 +0200
----------------------------------------------------------------------
.../plan/rules/dataSet/DataSetJoinRule.scala | 12 +++--
.../flink/api/scala/sql/test/JoinITCase.scala | 53 +++++++++++++++++++-
2 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24817c8c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
index 01b803c..89d33c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -21,12 +21,10 @@ package org.apache.flink.api.table.plan.rules.dataSet
import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rex.{RexInputRef, RexCall}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
-import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
class DataSetJoinRule
@@ -37,6 +35,14 @@ class DataSetJoinRule
"FlinkJoinRule")
{
+ /**
+ * Only translate INNER joins for now
+ */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+ join.getJoinType.equals(JoinRelType.INNER)
+ }
+
def convert(rel: RelNode): RelNode = {
val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
http://git-wip-us.apache.org/repos/asf/flink/blob/24817c8c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
index 74844ae..b09aa75 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.{TableException, Row}
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.plan.{PlanGenException, TranslationContext}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -256,4 +256,55 @@ class JoinITCase(
val results = result.toDataSet[Row](getConfig).collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test(expected = classOf[PlanGenException])
+ def testFullOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ }
+
+ @Test(expected = classOf[PlanGenException])
+ def testLeftOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ }
+
+ @Test(expected = classOf[PlanGenException])
+ def testRightOuterJoin(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = getScalaTableEnvironment
+ TranslationContext.reset()
+
+ val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e"
+
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+ }
}