You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 16:47:47 UTC
[1/2] flink git commit: [FLINK-6562] [table] Support implicit table
references for nested fields in SQL.
Repository: flink
Updated Branches:
refs/heads/master 423f4d65e -> ef751b2a1
[FLINK-6562] [table] Support implicit table references for nested fields in SQL.
This closes #3879.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88d56a9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88d56a9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88d56a9c
Branch: refs/heads/master
Commit: 88d56a9ce46ae0b7d73b64e340c2f59e76f79bf3
Parents: 423f4d6
Author: Haohui Mai <wh...@apache.org>
Authored: Thu May 11 15:35:39 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 17:15:23 2017 +0200
----------------------------------------------------------------------
.../plan/schema/CompositeRelDataType.scala | 4 ++--
.../table/expressions/CompositeAccessTest.scala | 22 +++++++++++++++++++-
2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/88d56a9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index 92f9199..a60514b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.schema
import java.util
-import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType, StructKind}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.CompositeRelDataType.createFieldList
@@ -36,7 +36,7 @@ import scala.collection.JavaConverters._
class CompositeRelDataType(
val compositeType: CompositeType[_],
typeFactory: FlinkTypeFactory)
- extends RelRecordType(createFieldList(compositeType, typeFactory)) {
+ extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) {
override def toString = s"COMPOSITE($compositeType)"
http://git-wip-us.apache.org/repos/asf/flink/blob/88d56a9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
index 2025880..91e06c5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
@@ -41,10 +41,13 @@ class CompositeAccessTest extends ExpressionTestBase {
"f0.get('intField')",
"testTable.f0.intField",
"42")
+ testSqlApi("f0.intField", "42")
testSqlApi("testTable.f0.stringField", "Bob")
+ testSqlApi("f0.stringField", "Bob")
testSqlApi("testTable.f0.booleanField", "true")
+ testSqlApi("f0.booleanField", "true")
// single field by int key
testTableApi(
@@ -58,22 +61,29 @@ class CompositeAccessTest extends ExpressionTestBase {
"f1.get('objectField').get('intField')",
"testTable.f1.objectField.intField",
"25")
+ testSqlApi("f1.objectField.intField", "25")
testSqlApi("testTable.f1.objectField.stringField", "Timo")
+ testSqlApi("f1.objectField.stringField", "Timo")
testSqlApi("testTable.f1.objectField.booleanField", "false")
+ testSqlApi("f1.objectField.booleanField", "false")
testAllApis(
'f2.get(0),
"f2.get(0)",
"testTable.f2._1",
"a")
+ testSqlApi("f2._1", "a")
testSqlApi("testTable.f3.f1", "b")
+ testSqlApi("f3.f1", "b")
testSqlApi("testTable.f4.myString", "Hello")
+ testSqlApi("f4.myString", "Hello")
testSqlApi("testTable.f5", "13")
+ testSqlApi("f5", "13")
testAllApis(
'f7.get("_1"),
@@ -83,18 +93,21 @@ class CompositeAccessTest extends ExpressionTestBase {
// composite field return type
testSqlApi("testTable.f6", "MyCaseClass2(null)")
+ testSqlApi("f6", "MyCaseClass2(null)")
testAllApis(
'f1.get("objectField"),
"f1.get('objectField')",
"testTable.f1.objectField",
"MyCaseClass(25,Timo,false)")
+ testSqlApi("f1.objectField", "MyCaseClass(25,Timo,false)")
testAllApis(
'f0,
"f0",
"testTable.f0",
"MyCaseClass(42,Bob,true)")
+ testSqlApi("f0", "MyCaseClass(42,Bob,true)")
// flattening (test base only returns first column)
testAllApis(
@@ -102,12 +115,14 @@ class CompositeAccessTest extends ExpressionTestBase {
"f1.get('objectField').flatten()",
"testTable.f1.objectField.*",
"25")
+ testSqlApi("f1.objectField.*", "25")
testAllApis(
'f0.flatten(),
"flatten(f0)",
"testTable.f0.*",
"42")
+ testSqlApi("f0.*", "42")
testTableApi(12.flatten(), "12.flatten()", "12")
@@ -115,11 +130,16 @@ class CompositeAccessTest extends ExpressionTestBase {
}
@Test(expected = classOf[ValidationException])
- def testWrongSqlField(): Unit = {
+ def testWrongSqlFieldFull(): Unit = {
testSqlApi("testTable.f5.test", "13")
}
@Test(expected = classOf[ValidationException])
+ def testWrongSqlField(): Unit = {
+ testSqlApi("f5.test", "13")
+ }
+
+ @Test(expected = classOf[ValidationException])
def testWrongIntKeyField(): Unit = {
testTableApi('f0.get(555), "'fail'", "fail")
}
[2/2] flink git commit: [FLINK-5256] [table] Extend
DataSetSingleRowJoin to support Left and Right joins.
Posted by fh...@apache.org.
[FLINK-5256] [table] Extend DataSetSingleRowJoin to support Left and Right joins.
This closes #3673.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef751b2a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef751b2a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef751b2a
Branch: refs/heads/master
Commit: ef751b2a1636daa5d0437fe9645d6799a4f72268
Parents: 88d56a9
Author: DmytroShkvyra <ds...@gmail.com>
Authored: Wed Apr 5 12:24:30 2017 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 17:15:32 2017 +0200
----------------------------------------------------------------------
.../nodes/dataset/DataSetSingleRowJoin.scala | 57 ++-
.../plan/nodes/logical/FlinkLogicalJoin.scala | 13 +-
.../dataSet/DataSetSingleRowJoinRule.scala | 10 +-
.../flink/table/runtime/MapJoinLeftRunner.scala | 4 +
.../table/runtime/MapJoinRightRunner.scala | 4 +
.../batch/sql/DataSetSingleRowJoinTest.scala | 428 +++++++++++++++++++
.../scala/batch/sql/DistinctAggregateTest.scala | 6 +-
.../table/api/scala/batch/sql/JoinITCase.scala | 181 +++++++-
.../api/scala/batch/sql/SingleRowJoinTest.scala | 195 ---------
9 files changed, 671 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index b7d1a4b..e92dec5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.calcite.rex.RexNode
@@ -47,6 +48,7 @@ class DataSetSingleRowJoin(
rowRelDataType: RelDataType,
joinCondition: RexNode,
joinRowType: RelDataType,
+ joinType: JoinRelType,
ruleDescription: String)
extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {
@@ -63,6 +65,7 @@ class DataSetSingleRowJoin(
getRowType,
joinCondition,
joinRowType,
+ joinType,
ruleDescription)
}
@@ -97,7 +100,6 @@ class DataSetSingleRowJoin(
tableEnv.getConfig,
leftDataSet.getType,
rightDataSet.getType,
- leftIsSingle,
joinCondition,
broadcastSetName)
@@ -118,14 +120,18 @@ class DataSetSingleRowJoin(
config: TableConfig,
inputType1: TypeInformation[Row],
inputType2: TypeInformation[Row],
- firstIsSingle: Boolean,
joinCondition: RexNode,
broadcastInputSetName: String)
: FlatMapFunction[Row, Row] = {
+ val isOuterJoin = joinType match {
+ case JoinRelType.LEFT | JoinRelType.RIGHT => true
+ case _ => false
+ }
+
val codeGenerator = new CodeGenerator(
config,
- false,
+ isOuterJoin,
inputType1,
Some(inputType2))
@@ -138,13 +144,38 @@ class DataSetSingleRowJoin(
val condition = codeGenerator.generateExpression(joinCondition)
val joinMethodBody =
- s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
+ if (joinType == JoinRelType.INNER) {
+ s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
+ } else {
+ val singleNode =
+ if (!leftIsSingle) {
+ rightNode
+ }
+ else {
+ leftNode
+ }
+
+ val notSuitedToCondition = singleNode
+ .getRowType
+ .getFieldList
+ .map(field => getRowType.getFieldNames.indexOf(field.getName))
+ .map(i => s"${conversion.resultTerm}.setField($i,null);")
+
+ s"""
+ |${condition.code}
+ |${conversion.code}
+ |if(!${condition.resultTerm}){
+ |${notSuitedToCondition.mkString("\n")}
+ |}
+ |${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+ |""".stripMargin
+ }
val genFunction = codeGenerator.generateFunction(
ruleDescription,
@@ -152,16 +183,18 @@ class DataSetSingleRowJoin(
joinMethodBody,
returnType)
- if (firstIsSingle) {
+ if (leftIsSingle) {
new MapJoinRightRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
+ isOuterJoin,
genFunction.returnType,
broadcastInputSetName)
} else {
new MapJoinLeftRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
+ isOuterJoin,
genFunction.returnType,
broadcastInputSetName)
}
@@ -181,7 +214,7 @@ class DataSetSingleRowJoin(
}
private def joinTypeToString: String = {
- "NestedLoopJoin"
+ "NestedLoop" + joinType.toString.toLowerCase.capitalize + "Join"
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
index 8df0b59..33c4caf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -77,7 +77,7 @@ private class FlinkLogicalJoinConverter
val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
val joinInfo = join.analyzeCondition
- hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
+ hasEqualityPredicates(join, joinInfo) || isSingleRowJoin(join)
}
override def convert(rel: RelNode): RelNode = {
@@ -101,11 +101,12 @@ private class FlinkLogicalJoinConverter
!joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
}
- private def isSingleRowInnerJoin(join: LogicalJoin): Boolean = {
- if (join.getJoinType == JoinRelType.INNER) {
- isSingleRow(join.getRight) || isSingleRow(join.getLeft)
- } else {
- false
+ private def isSingleRowJoin(join: LogicalJoin): Boolean = {
+ join.getJoinType match {
+ case JoinRelType.INNER if isSingleRow(join.getRight) || isSingleRow(join.getLeft) => true
+ case JoinRelType.LEFT if isSingleRow(join.getRight) => true
+ case JoinRelType.RIGHT if isSingleRow(join.getLeft) => true
+ case _ => false
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
index b61573c..f964a95 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
@@ -37,10 +37,11 @@ class DataSetSingleRowJoinRule
override def matches(call: RelOptRuleCall): Boolean = {
val join = call.rel(0).asInstanceOf[FlinkLogicalJoin]
- if (isInnerJoin(join)) {
- isSingleRow(join.getRight) || isSingleRow(join.getLeft)
- } else {
- false
+ join.getJoinType match {
+ case JoinRelType.INNER if isSingleRow(join.getLeft) || isSingleRow(join.getRight) => true
+ case JoinRelType.LEFT if isSingleRow(join.getRight) => true
+ case JoinRelType.RIGHT if isSingleRow(join.getLeft) => true
+ case _ => false
}
}
@@ -79,6 +80,7 @@ class DataSetSingleRowJoinRule
rel.getRowType,
join.getCondition,
join.getRowType,
+ join.getJoinType,
description)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
index 5f3dbb4..c906b63 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
@@ -19,11 +19,13 @@
package org.apache.flink.table.runtime
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
import org.apache.flink.util.Collector
class MapJoinLeftRunner[IN1, IN2, OUT](
name: String,
code: String,
+ outerJoin: Boolean,
returnType: TypeInformation[OUT],
broadcastSetName: String)
extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
@@ -31,7 +33,9 @@ class MapJoinLeftRunner[IN1, IN2, OUT](
override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit = {
broadcastSet match {
case Some(singleInput) => function.join(multiInput, singleInput, out)
+ case None if outerJoin => function.join(multiInput, null.asInstanceOf[IN2], out)
case None =>
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
index e2d9331..0cb4eda 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
@@ -19,11 +19,13 @@
package org.apache.flink.table.runtime
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
import org.apache.flink.util.Collector
class MapJoinRightRunner[IN1, IN2, OUT](
name: String,
code: String,
+ outerJoin: Boolean,
returnType: TypeInformation[OUT],
broadcastSetName: String)
extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
@@ -31,7 +33,9 @@ class MapJoinRightRunner[IN1, IN2, OUT](
override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit = {
broadcastSet match {
case Some(singleInput) => function.join(singleInput, multiInput, out)
+ case None if outerJoin => function.join(null.asInstanceOf[IN1], multiInput, out)
case None =>
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
new file mode 100644
index 0000000..1f7ce84
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class DataSetSingleRowJoinTest extends TableTestBase {
+
+ @Test
+ def testSingleRowCrossJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Int)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, asum " +
+ "FROM A, (SELECT sum(a1) + sum(a2) AS asum FROM A)"
+
+ val expected =
+ binaryNode(
+ "DataSetSingleRowJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ batchTableNode(0),
+ tuples(List(null, null)),
+ term("values", "a1", "a2")
+ ),
+ term("union","a1","a2")
+ ),
+ term("select", "SUM(a1) AS $f0", "SUM(a2) AS $f1")
+ ),
+ term("select", "+($f0, $f1) AS asum")
+ ),
+ term("where", "true"),
+ term("join", "a1", "asum"),
+ term("joinType", "NestedLoopInnerJoin")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowEquiJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, a2 " +
+ "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+ "WHERE a1 = cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
+ ),
+ term("union","a1")
+ ),
+ term("select", "COUNT(a1) AS cnt")
+ ),
+ term("where", "=(CAST(a1), cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopInnerJoin")
+ ),
+ term("select", "a1", "a2")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowNotEquiJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+ val query =
+ "SELECT a1, a2 " +
+ "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+ "WHERE a1 < cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
+ ),
+ term("union", "a1")
+ ),
+ term("select", "COUNT(a1) AS cnt")
+ ),
+ term("where", "<(a1, cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopInnerJoin")
+ ),
+ term("select", "a1", "a2")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testSingleRowJoinWithComplexPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long)]("A", 'a1, 'a2)
+ util.addTable[(Int, Long)]("B", 'b1, 'b2)
+
+ val query =
+ "SELECT a1, a2, b1, b2 " +
+ "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
+ "WHERE a1 < b1 AND a2 = b2"
+
+ val expected = binaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ batchTableNode(1),
+ tuples(List(null, null)),
+ term("values", "b1", "b2")
+ ),
+ term("union","b1","b2")
+ ),
+ term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
+ ),
+ term("where", "AND(<(a1, b1)", "=(a2, b2))"),
+ term("join", "a1", "a2", "b1", "b2"),
+ term("joinType", "NestedLoopInnerJoin")
+ )
+
+ util.verifySql(query, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Int)]("A", 'a1, 'a2)
+ util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+ val queryLeftJoin =
+ "SELECT a2 " +
+ "FROM A " +
+ " LEFT JOIN " +
+ "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+ " ON a1 = cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ term("where", "=(a1, cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopLeftJoin")
+ ),
+ term("select", "a2")
+ ) + "\n" +
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ )
+
+ util.verifySql(queryLeftJoin, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Int)]("A", 'a1, 'a2)
+ util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+ val queryLeftJoin =
+ "SELECT a2 " +
+ "FROM A " +
+ " LEFT JOIN " +
+ "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+ " ON a1 > cnt"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ batchTableNode(0),
+ term("where", ">(a1, cnt)"),
+ term("join", "a1", "a2", "cnt"),
+ term("joinType", "NestedLoopLeftJoin")
+ ),
+ term("select", "a2")
+ ) + "\n" +
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ )
+
+ util.verifySql(queryLeftJoin, expected)
+ }
+
+ @Test
+ def testLeftSingleRightJoinEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Long)]("A", 'a1, 'a2)
+ util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+ val queryRightJoin =
+ "SELECT a1 " +
+ "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+ " RIGHT JOIN " +
+ "A " +
+ " ON cnt = a2"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ "",
+ term("where", "=(cnt, a2)"),
+ term("join", "cnt", "a1", "a2"),
+ term("joinType", "NestedLoopRightJoin")
+ ),
+ term("select", "a1")
+ ) + unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ ) + "\n" +
+ batchTableNode(0)
+
+ util.verifySql(queryRightJoin, expected)
+ }
+
+ @Test
+ def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Long, Long)]("A", 'a1, 'a2)
+ util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+ val queryRightJoin =
+ "SELECT a1 " +
+ "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+ " RIGHT JOIN " +
+ "A " +
+ " ON cnt < a2"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ "",
+ term("where", "<(cnt, a2)"),
+ term("join", "cnt", "a1", "a2"),
+ term("joinType", "NestedLoopRightJoin")
+ ),
+ term("select", "a1")
+ ) +
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "0 AS $f0")),
+ tuples(List(null)), term("values", "$f0")
+ ),
+ term("union", "$f0")
+ ),
+ term("select", "COUNT(*) AS cnt")
+ ) + "\n" +
+ batchTableNode(0)
+
+ util.verifySql(queryRightJoin, expected)
+ }
+
+ @Test
+ def testSingleRowJoinInnerJoin(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Int)]("A", 'a1, 'a2)
+ val query =
+ "SELECT a2, sum(a1) " +
+ "FROM A " +
+ "GROUP BY a2 " +
+ "HAVING sum(a1) > (SELECT sum(a1) * 0.1 FROM A)"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetSingleRowJoin",
+ unaryNode(
+ "DataSetAggregate",
+ batchTableNode(0),
+ term("groupBy", "a2"),
+ term("select", "a2", "SUM(a1) AS EXPR$1")
+ ),
+ term("where", ">(EXPR$1, EXPR$0)"),
+ term("join", "a2", "EXPR$1", "EXPR$0"),
+ term("joinType", "NestedLoopInnerJoin")
+ ),
+ term("select", "a2", "EXPR$1")
+ ) + "\n" +
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ unaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetValues",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)), term("values", "a1")
+ ),
+ term("union", "a1")
+ ),
+ term("select", "SUM(a1) AS $f0")
+ ),
+ term("select", "*($f0, 0.1) AS EXPR$0")
+ )
+
+ util.verifySql(query, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
index 54b4d24..85cfb18 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
@@ -211,7 +211,7 @@ class DistinctAggregateTest extends TableTestBase {
),
term("where", "true"),
term("join", "EXPR$0", "EXPR$1"),
- term("joinType", "NestedLoopJoin")
+ term("joinType", "NestedLoopInnerJoin")
)
util.verifySql(sqlQuery, expected)
@@ -268,7 +268,7 @@ class DistinctAggregateTest extends TableTestBase {
),
term("where", "true"),
term("join", "EXPR$2, EXPR$0"),
- term("joinType", "NestedLoopJoin")
+ term("joinType", "NestedLoopInnerJoin")
),
unaryNode(
"DataSetAggregate",
@@ -294,7 +294,7 @@ class DistinctAggregateTest extends TableTestBase {
),
term("where", "true"),
term("join", "EXPR$2", "EXPR$0, EXPR$1"),
- term("joinType", "NestedLoopJoin")
+ term("joinType", "NestedLoopInnerJoin")
),
term("select", "EXPR$0, EXPR$1, EXPR$2")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index 8a8c0ce..3ed44a3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -40,10 +40,8 @@ class JoinITCase(
@Test
def testJoin(): Unit = {
-
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
@@ -280,8 +278,6 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- tEnv.sql(sqlQuery).toDataSet[Row].collect()
-
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
"null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
@@ -304,8 +300,6 @@ class JoinITCase(
tEnv.registerTable("Table3", ds1)
tEnv.registerTable("Table5", ds2)
- tEnv.sql(sqlQuery).toDataSet[Row].collect()
-
val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
"null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
"null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
@@ -372,11 +366,184 @@ class JoinITCase(
val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
val result = tEnv.sql(sqlQuery1).count()
-
Assert.assertEquals(0, result)
}
@Test
+ def testLeftNullRightJoin(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt " +
+ "FROM (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) RIGHT JOIN A ON a < cnt"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,null",
+ "2,null", "2,null",
+ "3,null", "3,null", "3,null",
+ "4,null", "4,null", "4,null", "4,null",
+ "5,null", "5,null", "5,null", "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+
+ @Test
+ def testLeftSingleRightJoinEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt = a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,null", "2,null", "2,null", "3,3", "3,3",
+ "3,3", "4,null", "4,null", "4,null",
+ "4,null", "5,null", "5,null", "5,null",
+ "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt > a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,3", "2,3", "2,3", "3,null", "3,null",
+ "3,null", "4,null", "4,null", "4,null",
+ "4,null", "5,null", "5,null", "5,null",
+ "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightNullLeftJoin(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt " +
+ "FROM A LEFT JOIN (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) ON cnt > a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("A", ds2)
+ tEnv.registerTable("B", ds1)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = Seq(
+ "2,null", "3,null", "1,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt = a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = Seq(
+ "1,null", "2,null", "2,null", "3,3", "3,3",
+ "3,3", "4,null", "4,null", "4,null",
+ "4,null", "5,null", "5,null", "5,null",
+ "5,null", "5,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt < a"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("A", ds1)
+ tEnv.registerTable("B", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = Seq(
+ "1,null", "2,null", "2,null", "3,null", "3,null",
+ "3,null", "4,3", "4,3", "4,3",
+ "4,3", "5,3", "5,3", "5,3",
+ "5,3", "5,3").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
+ def testRightSingleLeftJoinTwoFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+ val sqlQuery =
+ "SELECT a, cnt, cnt2 " +
+ "FROM t1 LEFT JOIN (SELECT COUNT(*) AS cnt,COUNT(*) AS cnt2 FROM t2 ) AS x ON a = cnt"
+
+ val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+ tEnv.registerTable("t1", ds1)
+ tEnv.registerTable("t2", ds2)
+
+ val result = tEnv.sql(sqlQuery)
+ val expected = Seq(
+ "1,null,null",
+ "2,null,null", "2,null,null",
+ "3,3,3", "3,3,3", "3,3,3",
+ "4,null,null", "4,null,null", "4,null,null", "4,null,null",
+ "5,null,null", "5,null,null", "5,null,null", "5,null,null", "5,null,null").mkString("\n")
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
def testCrossWithUnnest(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
deleted file mode 100644
index 27e3853..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class SingleRowJoinTest extends TableTestBase {
-
- @Test
- def testSingleRowJoinWithCalcInput(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Int)]("A", 'a1, 'a2)
-
- val query =
- "SELECT a1, asum " +
- "FROM A, (SELECT sum(a1) + sum(a2) AS asum FROM A)"
-
- val expected =
- binaryNode(
- "DataSetSingleRowJoin",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a1")
- ),
- unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetUnion",
- unaryNode(
- "DataSetValues",
- batchTableNode(0),
- tuples(List(null, null)),
- term("values", "a1", "a2")
- ),
- term("union","a1","a2")
- ),
- term("select", "SUM(a1) AS $f0", "SUM(a2) AS $f1")
- ),
- term("select", "+($f0, $f1) AS asum")
- ),
- term("where", "true"),
- term("join", "a1", "asum"),
- term("joinType", "NestedLoopJoin")
- )
-
- util.verifySql(query, expected)
- }
-
- @Test
- def testSingleRowEquiJoin(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, String)]("A", 'a1, 'a2)
-
- val query =
- "SELECT a1, a2 " +
- "FROM A, (SELECT count(a1) AS cnt FROM A) " +
- "WHERE a1 = cnt"
-
- val expected =
- unaryNode(
- "DataSetCalc",
- binaryNode(
- "DataSetSingleRowJoin",
- batchTableNode(0),
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetUnion",
- unaryNode(
- "DataSetValues",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a1")
- ),
- tuples(List(null)),
- term("values", "a1")
- ),
- term("union","a1")
- ),
- term("select", "COUNT(a1) AS cnt")
- ),
- term("where", "=(CAST(a1), cnt)"),
- term("join", "a1", "a2", "cnt"),
- term("joinType", "NestedLoopJoin")
- ),
- term("select", "a1", "a2")
- )
-
- util.verifySql(query, expected)
- }
-
- @Test
- def testSingleRowNotEquiJoin(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, String)]("A", 'a1, 'a2)
-
- val query =
- "SELECT a1, a2 " +
- "FROM A, (SELECT count(a1) AS cnt FROM A) " +
- "WHERE a1 < cnt"
-
- val expected =
- unaryNode(
- "DataSetCalc",
- binaryNode(
- "DataSetSingleRowJoin",
- batchTableNode(0),
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetUnion",
- unaryNode(
- "DataSetValues",
- unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a1")
- ),
- tuples(List(null)),
- term("values", "a1")
- ),
- term("union", "a1")
- ),
- term("select", "COUNT(a1) AS cnt")
- ),
- term("where", "<(a1, cnt)"),
- term("join", "a1", "a2", "cnt"),
- term("joinType", "NestedLoopJoin")
- ),
- term("select", "a1", "a2")
- )
-
- util.verifySql(query, expected)
- }
-
- @Test
- def testSingleRowJoinWithComplexPredicate(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long)]("A", 'a1, 'a2)
- util.addTable[(Int, Long)]("B", 'b1, 'b2)
-
- val query =
- "SELECT a1, a2, b1, b2 " +
- "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
- "WHERE a1 < b1 AND a2 = b2"
-
- val expected = binaryNode(
- "DataSetSingleRowJoin",
- batchTableNode(0),
- unaryNode(
- "DataSetAggregate",
- unaryNode(
- "DataSetUnion",
- unaryNode(
- "DataSetValues",
- batchTableNode(1),
- tuples(List(null, null)),
- term("values", "b1", "b2")
- ),
- term("union","b1","b2")
- ),
- term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
- ),
- term("where", "AND(<(a1, b1)", "=(a2, b2))"),
- term("join", "a1", "a2", "b1", "b2"),
- term("joinType", "NestedLoopJoin")
- )
-
- util.verifySql(query, expected)
- }
-}