You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 16:47:47 UTC

[1/2] flink git commit: [FLINK-6562] [table] Support implicit table references for nested fields in SQL.

Repository: flink
Updated Branches:
  refs/heads/master 423f4d65e -> ef751b2a1


[FLINK-6562] [table] Support implicit table references for nested fields in SQL.

This closes #3879.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88d56a9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88d56a9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88d56a9c

Branch: refs/heads/master
Commit: 88d56a9ce46ae0b7d73b64e340c2f59e76f79bf3
Parents: 423f4d6
Author: Haohui Mai <wh...@apache.org>
Authored: Thu May 11 15:35:39 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 17:15:23 2017 +0200

----------------------------------------------------------------------
 .../plan/schema/CompositeRelDataType.scala      |  4 ++--
 .../table/expressions/CompositeAccessTest.scala | 22 +++++++++++++++++++-
 2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88d56a9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index 92f9199..a60514b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.schema
 
 import java.util
 
-import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType, StructKind}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.CompositeRelDataType.createFieldList
@@ -36,7 +36,7 @@ import scala.collection.JavaConverters._
 class CompositeRelDataType(
     val compositeType: CompositeType[_],
     typeFactory: FlinkTypeFactory)
-  extends RelRecordType(createFieldList(compositeType, typeFactory)) {
+  extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) {
 
   override def toString = s"COMPOSITE($compositeType)"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88d56a9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
index 2025880..91e06c5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
@@ -41,10 +41,13 @@ class CompositeAccessTest extends ExpressionTestBase {
       "f0.get('intField')",
       "testTable.f0.intField",
       "42")
+    testSqlApi("f0.intField", "42")
 
     testSqlApi("testTable.f0.stringField", "Bob")
+    testSqlApi("f0.stringField", "Bob")
 
     testSqlApi("testTable.f0.booleanField", "true")
+    testSqlApi("f0.booleanField", "true")
 
     // single field by int key
     testTableApi(
@@ -58,22 +61,29 @@ class CompositeAccessTest extends ExpressionTestBase {
       "f1.get('objectField').get('intField')",
       "testTable.f1.objectField.intField",
       "25")
+    testSqlApi("f1.objectField.intField", "25")
 
     testSqlApi("testTable.f1.objectField.stringField", "Timo")
+    testSqlApi("f1.objectField.stringField", "Timo")
 
     testSqlApi("testTable.f1.objectField.booleanField", "false")
+    testSqlApi("f1.objectField.booleanField", "false")
 
     testAllApis(
       'f2.get(0),
       "f2.get(0)",
       "testTable.f2._1",
       "a")
+    testSqlApi("f2._1", "a")
 
     testSqlApi("testTable.f3.f1", "b")
+    testSqlApi("f3.f1", "b")
 
     testSqlApi("testTable.f4.myString", "Hello")
+    testSqlApi("f4.myString", "Hello")
 
     testSqlApi("testTable.f5", "13")
+    testSqlApi("f5", "13")
 
     testAllApis(
       'f7.get("_1"),
@@ -83,18 +93,21 @@ class CompositeAccessTest extends ExpressionTestBase {
 
     // composite field return type
     testSqlApi("testTable.f6", "MyCaseClass2(null)")
+    testSqlApi("f6", "MyCaseClass2(null)")
 
     testAllApis(
       'f1.get("objectField"),
       "f1.get('objectField')",
       "testTable.f1.objectField",
       "MyCaseClass(25,Timo,false)")
+    testSqlApi("f1.objectField", "MyCaseClass(25,Timo,false)")
 
     testAllApis(
       'f0,
       "f0",
       "testTable.f0",
       "MyCaseClass(42,Bob,true)")
+    testSqlApi("f0", "MyCaseClass(42,Bob,true)")
 
     // flattening (test base only returns first column)
     testAllApis(
@@ -102,12 +115,14 @@ class CompositeAccessTest extends ExpressionTestBase {
       "f1.get('objectField').flatten()",
       "testTable.f1.objectField.*",
       "25")
+    testSqlApi("f1.objectField.*", "25")
 
     testAllApis(
       'f0.flatten(),
       "flatten(f0)",
       "testTable.f0.*",
       "42")
+    testSqlApi("f0.*", "42")
 
     testTableApi(12.flatten(), "12.flatten()", "12")
 
@@ -115,11 +130,16 @@ class CompositeAccessTest extends ExpressionTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testWrongSqlField(): Unit = {
+  def testWrongSqlFieldFull(): Unit = {
     testSqlApi("testTable.f5.test", "13")
   }
 
   @Test(expected = classOf[ValidationException])
+  def testWrongSqlField(): Unit = {
+    testSqlApi("f5.test", "13")
+  }
+
+  @Test(expected = classOf[ValidationException])
   def testWrongIntKeyField(): Unit = {
     testTableApi('f0.get(555), "'fail'", "fail")
   }


[2/2] flink git commit: [FLINK-5256] [table] Extend DataSetSingleRowJoin to support Left and Right joins.

Posted by fh...@apache.org.
[FLINK-5256] [table] Extend DataSetSingleRowJoin to support Left and Right joins.

This closes #3673.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef751b2a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef751b2a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef751b2a

Branch: refs/heads/master
Commit: ef751b2a1636daa5d0437fe9645d6799a4f72268
Parents: 88d56a9
Author: DmytroShkvyra <ds...@gmail.com>
Authored: Wed Apr 5 12:24:30 2017 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 17:15:32 2017 +0200

----------------------------------------------------------------------
 .../nodes/dataset/DataSetSingleRowJoin.scala    |  57 ++-
 .../plan/nodes/logical/FlinkLogicalJoin.scala   |  13 +-
 .../dataSet/DataSetSingleRowJoinRule.scala      |  10 +-
 .../flink/table/runtime/MapJoinLeftRunner.scala |   4 +
 .../table/runtime/MapJoinRightRunner.scala      |   4 +
 .../batch/sql/DataSetSingleRowJoinTest.scala    | 428 +++++++++++++++++++
 .../scala/batch/sql/DistinctAggregateTest.scala |   6 +-
 .../table/api/scala/batch/sql/JoinITCase.scala  | 181 +++++++-
 .../api/scala/batch/sql/SingleRowJoinTest.scala | 195 ---------
 9 files changed, 671 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index b7d1a4b..e92dec5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
@@ -47,6 +48,7 @@ class DataSetSingleRowJoin(
     rowRelDataType: RelDataType,
     joinCondition: RexNode,
     joinRowType: RelDataType,
+    joinType: JoinRelType,
     ruleDescription: String)
   extends BiRel(cluster, traitSet, leftNode, rightNode)
   with DataSetRel {
@@ -63,6 +65,7 @@ class DataSetSingleRowJoin(
       getRowType,
       joinCondition,
       joinRowType,
+      joinType,
       ruleDescription)
   }
 
@@ -97,7 +100,6 @@ class DataSetSingleRowJoin(
       tableEnv.getConfig,
       leftDataSet.getType,
       rightDataSet.getType,
-      leftIsSingle,
       joinCondition,
       broadcastSetName)
 
@@ -118,14 +120,18 @@ class DataSetSingleRowJoin(
       config: TableConfig,
       inputType1: TypeInformation[Row],
       inputType2: TypeInformation[Row],
-      firstIsSingle: Boolean,
       joinCondition: RexNode,
       broadcastInputSetName: String)
     : FlatMapFunction[Row, Row] = {
 
+    val isOuterJoin = joinType match {
+      case JoinRelType.LEFT | JoinRelType.RIGHT => true
+      case _ => false
+    }    
+    
     val codeGenerator = new CodeGenerator(
       config,
-      false,
+      isOuterJoin,
       inputType1,
       Some(inputType2))
 
@@ -138,13 +144,38 @@ class DataSetSingleRowJoin(
     val condition = codeGenerator.generateExpression(joinCondition)
 
     val joinMethodBody =
-      s"""
-        |${condition.code}
-        |if (${condition.resultTerm}) {
-        |  ${conversion.code}
-        |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
-        |}
-        |""".stripMargin
+      if (joinType == JoinRelType.INNER) {
+        s"""
+         |${condition.code}
+         |if (${condition.resultTerm}) {
+         |  ${conversion.code}
+         |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+         |}
+         |""".stripMargin
+    } else {
+      val singleNode =
+        if (!leftIsSingle) {
+          rightNode
+        }
+        else {
+          leftNode
+        }
+
+      val notSuitedToCondition = singleNode
+        .getRowType
+        .getFieldList
+        .map(field => getRowType.getFieldNames.indexOf(field.getName))
+        .map(i => s"${conversion.resultTerm}.setField($i,null);")
+
+        s"""
+           |${condition.code}
+           |${conversion.code}
+           |if(!${condition.resultTerm}){
+           |${notSuitedToCondition.mkString("\n")}
+           |}
+           |${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+           |""".stripMargin
+    }
 
     val genFunction = codeGenerator.generateFunction(
       ruleDescription,
@@ -152,16 +183,18 @@ class DataSetSingleRowJoin(
       joinMethodBody,
       returnType)
 
-    if (firstIsSingle) {
+    if (leftIsSingle) {
       new MapJoinRightRunner[Row, Row, Row](
         genFunction.name,
         genFunction.code,
+        isOuterJoin,
         genFunction.returnType,
         broadcastInputSetName)
     } else {
       new MapJoinLeftRunner[Row, Row, Row](
         genFunction.name,
         genFunction.code,
+        isOuterJoin,
         genFunction.returnType,
         broadcastInputSetName)
     }
@@ -181,7 +214,7 @@ class DataSetSingleRowJoin(
   }
 
   private def joinTypeToString: String = {
-    "NestedLoopJoin"
+    "NestedLoop" + joinType.toString.toLowerCase.capitalize + "Join"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
index 8df0b59..33c4caf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -77,7 +77,7 @@ private class FlinkLogicalJoinConverter
     val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
     val joinInfo = join.analyzeCondition
 
-    hasEqualityPredicates(join, joinInfo) || isSingleRowInnerJoin(join)
+    hasEqualityPredicates(join, joinInfo) || isSingleRowJoin(join)
   }
 
   override def convert(rel: RelNode): RelNode = {
@@ -101,11 +101,12 @@ private class FlinkLogicalJoinConverter
     !joinInfo.pairs().isEmpty && (joinInfo.isEqui || join.getJoinType == JoinRelType.INNER)
   }
 
-  private def isSingleRowInnerJoin(join: LogicalJoin): Boolean = {
-    if (join.getJoinType == JoinRelType.INNER) {
-      isSingleRow(join.getRight) || isSingleRow(join.getLeft)
-    } else {
-      false
+  private def isSingleRowJoin(join: LogicalJoin): Boolean = {
+    join.getJoinType match {
+      case JoinRelType.INNER if isSingleRow(join.getRight) || isSingleRow(join.getLeft) => true
+      case JoinRelType.LEFT if isSingleRow(join.getRight) => true
+      case JoinRelType.RIGHT if isSingleRow(join.getLeft) => true
+      case _ => false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
index b61573c..f964a95 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
@@ -37,10 +37,11 @@ class DataSetSingleRowJoinRule
   override def matches(call: RelOptRuleCall): Boolean = {
     val join = call.rel(0).asInstanceOf[FlinkLogicalJoin]
 
-    if (isInnerJoin(join)) {
-      isSingleRow(join.getRight) || isSingleRow(join.getLeft)
-    } else {
-      false
+    join.getJoinType match {
+      case JoinRelType.INNER if isSingleRow(join.getLeft) || isSingleRow(join.getRight) => true
+      case JoinRelType.LEFT if isSingleRow(join.getRight) => true
+      case JoinRelType.RIGHT if isSingleRow(join.getLeft) => true
+      case _ => false
     }
   }
 
@@ -79,6 +80,7 @@ class DataSetSingleRowJoinRule
       rel.getRowType,
       join.getCondition,
       join.getRowType,
+      join.getJoinType,
       description)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
index 5f3dbb4..c906b63 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
@@ -19,11 +19,13 @@
 package org.apache.flink.table.runtime
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
 class MapJoinLeftRunner[IN1, IN2, OUT](
     name: String,
     code: String,
+    outerJoin: Boolean,
     returnType: TypeInformation[OUT],
     broadcastSetName: String)
   extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
@@ -31,7 +33,9 @@ class MapJoinLeftRunner[IN1, IN2, OUT](
   override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit = {
     broadcastSet match {
       case Some(singleInput) => function.join(multiInput, singleInput, out)
+      case None if outerJoin => function.join(multiInput, null.asInstanceOf[IN2], out)
       case None =>
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
index e2d9331..0cb4eda 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
@@ -19,11 +19,13 @@
 package org.apache.flink.table.runtime
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
 class MapJoinRightRunner[IN1, IN2, OUT](
     name: String,
     code: String,
+    outerJoin: Boolean,
     returnType: TypeInformation[OUT],
     broadcastSetName: String)
   extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
@@ -31,7 +33,9 @@ class MapJoinRightRunner[IN1, IN2, OUT](
   override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit = {
     broadcastSet match {
       case Some(singleInput) => function.join(singleInput, multiInput, out)
+      case None if outerJoin => function.join(null.asInstanceOf[IN1], multiInput, out)
       case None =>
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
new file mode 100644
index 0000000..1f7ce84
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class DataSetSingleRowJoinTest extends TableTestBase {
+
+  @Test
+  def testSingleRowCrossJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Int)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, asum " +
+      "FROM A, (SELECT sum(a1) + sum(a2) AS asum FROM A)"
+
+    val expected =
+      binaryNode(
+        "DataSetSingleRowJoin",
+        unaryNode(
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "a1")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                batchTableNode(0),
+                tuples(List(null, null)),
+                term("values", "a1", "a2")
+              ),
+              term("union","a1","a2")
+            ),
+            term("select", "SUM(a1) AS $f0", "SUM(a2) AS $f1")
+          ),
+          term("select", "+($f0, $f1) AS asum")
+        ),
+        term("where", "true"),
+        term("join", "a1", "asum"),
+        term("joinType", "NestedLoopInnerJoin")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowEquiJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, a2 " +
+      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+      "WHERE a1 = cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        binaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
+              ),
+              term("union","a1")
+            ),
+            term("select", "COUNT(a1) AS cnt")
+          ),
+          term("where", "=(CAST(a1), cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopInnerJoin")
+        ),
+        term("select", "a1", "a2")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowNotEquiJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, a2 " +
+      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+      "WHERE a1 < cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        binaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
+              ),
+              term("union", "a1")
+            ),
+            term("select", "COUNT(a1) AS cnt")
+          ),
+          term("where", "<(a1, cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopInnerJoin")
+        ),
+        term("select", "a1", "a2")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowJoinWithComplexPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long)]("A", 'a1, 'a2)
+    util.addTable[(Int, Long)]("B", 'b1, 'b2)
+
+    val query =
+      "SELECT a1, a2, b1, b2 " +
+        "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
+        "WHERE a1 < b1 AND a2 = b2"
+
+    val expected = binaryNode(
+      "DataSetSingleRowJoin",
+      batchTableNode(0),
+      unaryNode(
+        "DataSetAggregate",
+        unaryNode(
+          "DataSetUnion",
+          unaryNode(
+            "DataSetValues",
+            batchTableNode(1),
+            tuples(List(null, null)),
+            term("values", "b1", "b2")
+          ),
+          term("union","b1","b2")
+        ),
+        term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
+      ),
+      term("where", "AND(<(a1, b1)", "=(a2, b2))"),
+      term("join", "a1", "a2", "b1", "b2"),
+      term("joinType", "NestedLoopInnerJoin")
+    )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testRightSingleLeftJoinEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Int)]("A", 'a1, 'a2)
+    util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+    val queryLeftJoin =
+      "SELECT a2 " +
+        "FROM A " +
+        "  LEFT JOIN " +
+        "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+        "  ON a1 = cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          term("where", "=(a1, cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopLeftJoin")
+        ),
+        term("select", "a2")
+      ) + "\n" +
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        )
+
+    util.verifySql(queryLeftJoin, expected)
+  }
+
+  @Test
+  def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Int)]("A", 'a1, 'a2)
+    util.addTable[(Int, Int)]("B", 'b1, 'b2)
+
+    val queryLeftJoin =
+      "SELECT a2 " +
+        "FROM A " +
+        "  LEFT JOIN " +
+        "(SELECT COUNT(*) AS cnt FROM B) AS x " +
+        "  ON a1 > cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          term("where", ">(a1, cnt)"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopLeftJoin")
+        ),
+        term("select", "a2")
+      ) + "\n" +
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        )
+
+    util.verifySql(queryLeftJoin, expected)
+  }
+
+  @Test
+  def testLeftSingleRightJoinEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Long)]("A", 'a1, 'a2)
+    util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+    val queryRightJoin =
+      "SELECT a1 " +
+        "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+        "  RIGHT JOIN " +
+        "A " +
+        "  ON cnt = a2"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          "",
+          term("where", "=(cnt, a2)"),
+          term("join", "cnt", "a1", "a2"),
+          term("joinType", "NestedLoopRightJoin")
+        ),
+        term("select", "a1")
+      ) + unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        ) + "\n" +
+        batchTableNode(0)
+
+    util.verifySql(queryRightJoin, expected)
+  }
+
+  @Test
+  def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Long)]("A", 'a1, 'a2)
+    util.addTable[(Long, Long)]("B", 'b1, 'b2)
+
+    val queryRightJoin =
+      "SELECT a1 " +
+        "FROM (SELECT COUNT(*) AS cnt FROM B) " +
+        "  RIGHT JOIN " +
+        "A " +
+        "  ON cnt < a2"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          "",
+          term("where", "<(cnt, a2)"),
+          term("join", "cnt", "a1", "a2"),
+          term("joinType", "NestedLoopRightJoin")
+        ),
+        term("select", "a1")
+      ) +
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetUnion",
+            unaryNode(
+              "DataSetValues",
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "0 AS $f0")),
+              tuples(List(null)), term("values", "$f0")
+            ),
+            term("union", "$f0")
+          ),
+          term("select", "COUNT(*) AS cnt")
+        ) + "\n" +
+        batchTableNode(0)
+
+    util.verifySql(queryRightJoin, expected)
+  }
+
+  @Test
+  def testSingleRowJoinInnerJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Int)]("A", 'a1, 'a2)
+    val query =
+      "SELECT a2, sum(a1) " +
+        "FROM A " +
+        "GROUP BY a2 " +
+        "HAVING sum(a1) > (SELECT sum(a1) * 0.1 FROM A)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetSingleRowJoin",
+          unaryNode(
+            "DataSetAggregate",
+            batchTableNode(0),
+            term("groupBy", "a2"),
+            term("select", "a2", "SUM(a1) AS EXPR$1")
+          ),
+          term("where", ">(EXPR$1, EXPR$0)"),
+          term("join", "a2", "EXPR$1", "EXPR$0"),
+          term("joinType", "NestedLoopInnerJoin")
+        ),
+        term("select", "a2", "EXPR$1")
+      ) + "\n" +
+        unaryNode(
+          "DataSetCalc",
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)), term("values", "a1")
+              ),
+              term("union", "a1")
+            ),
+            term("select", "SUM(a1) AS $f0")
+          ),
+          term("select", "*($f0, 0.1) AS EXPR$0")
+        )
+
+    util.verifySql(query, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
index 54b4d24..85cfb18 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala
@@ -211,7 +211,7 @@ class DistinctAggregateTest extends TableTestBase {
       ),
       term("where", "true"),
       term("join", "EXPR$0", "EXPR$1"),
-      term("joinType", "NestedLoopJoin")
+      term("joinType", "NestedLoopInnerJoin")
     )
 
     util.verifySql(sqlQuery, expected)
@@ -268,7 +268,7 @@ class DistinctAggregateTest extends TableTestBase {
           ),
           term("where", "true"),
           term("join", "EXPR$2, EXPR$0"),
-          term("joinType", "NestedLoopJoin")
+          term("joinType", "NestedLoopInnerJoin")
         ),
         unaryNode(
           "DataSetAggregate",
@@ -294,7 +294,7 @@ class DistinctAggregateTest extends TableTestBase {
         ),
         term("where", "true"),
         term("join", "EXPR$2", "EXPR$0, EXPR$1"),
-        term("joinType", "NestedLoopJoin")
+        term("joinType", "NestedLoopInnerJoin")
       ),
       term("select", "EXPR$0, EXPR$1, EXPR$2")
     )

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index 8a8c0ce..3ed44a3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -40,10 +40,8 @@ class JoinITCase(
 
   @Test
   def testJoin(): Unit = {
-
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
@@ -280,8 +278,6 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    tEnv.sql(sqlQuery).toDataSet[Row].collect()
-
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
       "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
@@ -304,8 +300,6 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    tEnv.sql(sqlQuery).toDataSet[Row].collect()
-
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
       "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
@@ -372,11 +366,184 @@ class JoinITCase(
 
     val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
     val result = tEnv.sql(sqlQuery1).count()
-
     Assert.assertEquals(0, result)
   }
 
   @Test
+  def testLeftNullRightJoin(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    val sqlQuery =
+      "SELECT a, cnt " +
+      "FROM (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) RIGHT JOIN A ON a < cnt"
+
+    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("A", ds1)
+    tEnv.registerTable("B", ds2)
+
+
+    val result = tEnv.sql(sqlQuery)
+    val expected = Seq(
+          "1,null",
+          "2,null", "2,null",
+          "3,null", "3,null", "3,null",
+          "4,null", "4,null", "4,null", "4,null",
+          "5,null", "5,null", "5,null", "5,null", "5,null").mkString("\n")
+
+    val results = result.toDataSet[Row].collect()
+
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testLeftSingleRightJoinEqualPredicate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    val sqlQuery =
+      "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt = a"
+
+    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("A", ds1)
+    tEnv.registerTable("B", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+    val expected = Seq(
+      "1,null", "2,null", "2,null", "3,3", "3,3",
+      "3,3", "4,null", "4,null", "4,null",
+      "4,null", "5,null", "5,null", "5,null",
+      "5,null", "5,null").mkString("\n")
+
+    val results = result.toDataSet[Row].collect()
+
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testLeftSingleRightJoinNotEqualPredicate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    val sqlQuery =
+      "SELECT a, cnt FROM (SELECT COUNT(*) AS cnt FROM B) RIGHT JOIN A ON cnt > a"
+
+    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("A", ds1)
+    tEnv.registerTable("B", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+    val expected = Seq(
+      "1,3", "2,3", "2,3", "3,null", "3,null",
+      "3,null", "4,null", "4,null", "4,null",
+      "4,null", "5,null", "5,null", "5,null",
+      "5,null", "5,null").mkString("\n")
+
+    val results = result.toDataSet[Row].collect()
+
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRightNullLeftJoin(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    val sqlQuery =
+      "SELECT a, cnt " +
+      "FROM A LEFT JOIN (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) ON cnt > a"
+
+    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("A", ds2)
+    tEnv.registerTable("B", ds1)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = Seq(
+      "2,null", "3,null", "1,null").mkString("\n")
+
+    val results = result.toDataSet[Row].collect()
+
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRightSingleLeftJoinEqualPredicate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    val sqlQuery =
+      "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt = a"
+
+    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("A", ds1)
+    tEnv.registerTable("B", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = Seq(
+      "1,null", "2,null", "2,null", "3,3", "3,3",
+      "3,3", "4,null", "4,null", "4,null",
+      "4,null", "5,null", "5,null", "5,null",
+      "5,null", "5,null").mkString("\n")
+
+    val results = result.toDataSet[Row].collect()
+
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRightSingleLeftJoinNotEqualPredicate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    val sqlQuery =
+      "SELECT a, cnt FROM A LEFT JOIN (SELECT COUNT(*) AS cnt FROM B) ON cnt < a"
+
+    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("A", ds1)
+    tEnv.registerTable("B", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = Seq(
+      "1,null", "2,null", "2,null", "3,null", "3,null",
+      "3,null", "4,3", "4,3", "4,3",
+      "4,3", "5,3", "5,3", "5,3",
+      "5,3", "5,3").mkString("\n")
+
+    val results = result.toDataSet[Row].collect()
+
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRightSingleLeftJoinTwoFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    val sqlQuery =
+      "SELECT a, cnt, cnt2 " +
+      "FROM t1 LEFT JOIN (SELECT COUNT(*) AS cnt,COUNT(*) AS cnt2 FROM t2 ) AS x ON a = cnt"
+
+    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("t1", ds1)
+    tEnv.registerTable("t2", ds2)
+
+    val result = tEnv.sql(sqlQuery)
+    val expected = Seq(
+      "1,null,null",
+      "2,null,null", "2,null,null",
+      "3,3,3", "3,3,3", "3,3,3",
+      "4,null,null", "4,null,null", "4,null,null", "4,null,null",
+      "5,null,null", "5,null,null", "5,null,null", "5,null,null", "5,null,null").mkString("\n")
+
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testCrossWithUnnest(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/ef751b2a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
deleted file mode 100644
index 27e3853..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SingleRowJoinTest.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class SingleRowJoinTest extends TableTestBase {
-
-  @Test
-  def testSingleRowJoinWithCalcInput(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Int)]("A", 'a1, 'a2)
-
-    val query =
-      "SELECT a1, asum " +
-      "FROM A, (SELECT sum(a1) + sum(a2) AS asum FROM A)"
-
-    val expected =
-      binaryNode(
-        "DataSetSingleRowJoin",
-        unaryNode(
-          "DataSetCalc",
-          batchTableNode(0),
-          term("select", "a1")
-        ),
-        unaryNode(
-          "DataSetCalc",
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                batchTableNode(0),
-                tuples(List(null, null)),
-                term("values", "a1", "a2")
-              ),
-              term("union","a1","a2")
-            ),
-            term("select", "SUM(a1) AS $f0", "SUM(a2) AS $f1")
-          ),
-          term("select", "+($f0, $f1) AS asum")
-        ),
-        term("where", "true"),
-        term("join", "a1", "asum"),
-        term("joinType", "NestedLoopJoin")
-      )
-
-    util.verifySql(query, expected)
-  }
-
-  @Test
-  def testSingleRowEquiJoin(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, String)]("A", 'a1, 'a2)
-
-    val query =
-      "SELECT a1, a2 " +
-      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
-      "WHERE a1 = cnt"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        binaryNode(
-          "DataSetSingleRowJoin",
-          batchTableNode(0),
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "a1")
-                ),
-                tuples(List(null)),
-                term("values", "a1")
-              ),
-              term("union","a1")
-            ),
-            term("select", "COUNT(a1) AS cnt")
-          ),
-          term("where", "=(CAST(a1), cnt)"),
-          term("join", "a1", "a2", "cnt"),
-          term("joinType", "NestedLoopJoin")
-        ),
-        term("select", "a1", "a2")
-      )
-
-    util.verifySql(query, expected)
-  }
-
-  @Test
-  def testSingleRowNotEquiJoin(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, String)]("A", 'a1, 'a2)
-
-    val query =
-      "SELECT a1, a2 " +
-      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
-      "WHERE a1 < cnt"
-
-    val expected =
-      unaryNode(
-        "DataSetCalc",
-        binaryNode(
-          "DataSetSingleRowJoin",
-          batchTableNode(0),
-          unaryNode(
-            "DataSetAggregate",
-            unaryNode(
-              "DataSetUnion",
-              unaryNode(
-                "DataSetValues",
-                unaryNode(
-                  "DataSetCalc",
-                  batchTableNode(0),
-                  term("select", "a1")
-                ),
-                tuples(List(null)),
-                term("values", "a1")
-              ),
-              term("union", "a1")
-            ),
-            term("select", "COUNT(a1) AS cnt")
-          ),
-          term("where", "<(a1, cnt)"),
-          term("join", "a1", "a2", "cnt"),
-          term("joinType", "NestedLoopJoin")
-        ),
-        term("select", "a1", "a2")
-      )
-
-    util.verifySql(query, expected)
-  }
-
-  @Test
-  def testSingleRowJoinWithComplexPredicate(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long)]("A", 'a1, 'a2)
-    util.addTable[(Int, Long)]("B", 'b1, 'b2)
-
-    val query =
-      "SELECT a1, a2, b1, b2 " +
-        "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
-        "WHERE a1 < b1 AND a2 = b2"
-
-    val expected = binaryNode(
-      "DataSetSingleRowJoin",
-      batchTableNode(0),
-      unaryNode(
-        "DataSetAggregate",
-        unaryNode(
-          "DataSetUnion",
-          unaryNode(
-            "DataSetValues",
-            batchTableNode(1),
-            tuples(List(null, null)),
-            term("values", "b1", "b2")
-          ),
-          term("union","b1","b2")
-        ),
-        term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
-      ),
-      term("where", "AND(<(a1, b1)", "=(a2, b2))"),
-      term("join", "a1", "a2", "b1", "b2"),
-      term("joinType", "NestedLoopJoin")
-    )
-
-    util.verifySql(query, expected)
-  }
-}