You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/12 11:25:40 UTC

flink git commit: [FLINK-3731] make embedded SQL outer joins fail during translation

Repository: flink
Updated Branches:
  refs/heads/master 2e63d1afb -> 24817c8cd


[FLINK-3731] make embedded SQL outer joins fail during translation

This closes #1869


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24817c8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24817c8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24817c8c

Branch: refs/heads/master
Commit: 24817c8cd26df18e0d0b9d313d24aa528e7e2379
Parents: 2e63d1a
Author: vasia <va...@apache.org>
Authored: Mon Apr 11 19:43:59 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Tue Apr 12 10:29:41 2016 +0200

----------------------------------------------------------------------
 .../plan/rules/dataSet/DataSetJoinRule.scala    | 12 +++--
 .../flink/api/scala/sql/test/JoinITCase.scala   | 53 +++++++++++++++++++-
 2 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24817c8c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
index 01b803c..89d33c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -21,12 +21,10 @@ package org.apache.flink.api.table.plan.rules.dataSet
 import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.logical.LogicalJoin
-import org.apache.calcite.rex.{RexInputRef, RexCall}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
-import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 
 class DataSetJoinRule
@@ -37,6 +35,14 @@ class DataSetJoinRule
       "FlinkJoinRule")
   {
 
+  /**
+   * Only translate INNER joins for now
+   */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+    join.getJoinType.equals(JoinRelType.INNER)
+  }
+
     def convert(rel: RelNode): RelNode = {
 
       val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]

http://git-wip-us.apache.org/repos/asf/flink/blob/24817c8c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
index 74844ae..b09aa75 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.{TableException, Row}
-import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.plan.{PlanGenException, TranslationContext}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -256,4 +256,55 @@ class JoinITCase(
     val results = result.toDataSet[Row](getConfig).collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test(expected = classOf[PlanGenException])
+  def testFullOuterJoin(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+  }
+
+  @Test(expected = classOf[PlanGenException])
+  def testLeftOuterJoin(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+  }
+
+  @Test(expected = classOf[PlanGenException])
+  def testRightOuterJoin(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("Table3", ds1)
+    tEnv.registerTable("Table5", ds2)
+
+    tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect()
+  }
 }