You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/01 19:47:54 UTC

flink git commit: [FLINK-5159] [table] Add single-row broadcast nested-loop join.

Repository: flink
Updated Branches:
  refs/heads/master b181662be -> dbe707324


[FLINK-5159] [table] Add single-row broadcast nested-loop join.

- Support for arbitrary joins (cross, theta, equi) if one input has exactly one row.

This closes #2811.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbe70732
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbe70732
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbe70732

Branch: refs/heads/master
Commit: dbe70732434ff1396e1829080cd14f26b691489a
Parents: b181662
Author: Alexander Shoshin <Al...@epam.com>
Authored: Wed Nov 9 11:27:57 2016 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 1 20:42:10 2016 +0100

----------------------------------------------------------------------
 .../nodes/dataset/DataSetSingleRowJoin.scala    | 193 +++++++++++++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |   1 +
 .../dataSet/DataSetSingleRowJoinRule.scala      |  82 ++++++++
 .../api/table/runtime/MapJoinLeftRunner.scala   |  33 ++++
 .../api/table/runtime/MapJoinRightRunner.scala  |  33 ++++
 .../api/table/runtime/MapSideJoinRunner.scala   |  51 +++++
 .../flink/api/scala/batch/sql/JoinITCase.scala  |  48 +++++
 .../api/scala/batch/sql/SingleRowJoinTest.scala | 146 ++++++++++++++
 8 files changed, 587 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
new file mode 100644
index 0000000..0306c00
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
+import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    leftIsSingle: Boolean,
+    rowRelDataType: RelDataType,
+    joinCondition: RexNode,
+    joinRowType: RelDataType,
+    ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetSingleRowJoin(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      leftIsSingle,
+      getRowType,
+      joinCondition,
+      joinRowType,
+      ruleDescription)
+  }
+
+  override def toString: String = {
+    s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("where", joinConditionToString)
+      .item("join", joinSelectionToString)
+      .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val child = if (leftIsSingle) {
+      this.getRight
+    } else {
+      this.getLeft
+    }
+    val rowCnt = metadata.getRowCount(child)
+    val rowSize = this.estimateRowSize(child.getRowType)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val broadcastSetName = "joinSet"
+    val mapSideJoin = generateMapFunction(
+      tableEnv.getConfig,
+      leftDataSet.getType,
+      rightDataSet.getType,
+      leftIsSingle,
+      joinCondition,
+      broadcastSetName,
+      expectedType)
+
+    val (multiRowDataSet, singleRowDataSet) =
+      if (leftIsSingle) {
+        (rightDataSet, leftDataSet)
+      } else {
+        (leftDataSet, rightDataSet)
+      }
+
+    multiRowDataSet
+      .flatMap(mapSideJoin)
+      .withBroadcastSet(singleRowDataSet, broadcastSetName)
+      .name(getMapOperatorName)
+      .asInstanceOf[DataSet[Any]]
+  }
+
+  private def generateMapFunction(
+      config: TableConfig,
+      inputType1: TypeInformation[Any],
+      inputType2: TypeInformation[Any],
+      firstIsSingle: Boolean,
+      joinCondition: RexNode,
+      broadcastInputSetName: String,
+      expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
+
+    val codeGenerator = new CodeGenerator(
+      config,
+      false,
+      inputType1,
+      Some(inputType2))
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val conversion = codeGenerator.generateConverterResultExpression(
+      returnType,
+      joinRowType.getFieldNames)
+
+    val condition = codeGenerator.generateExpression(joinCondition)
+
+    val joinMethodBody = s"""
+                  |${condition.code}
+                  |if (${condition.resultTerm}) {
+                  |  ${conversion.code}
+                  |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+                  |}
+                  |""".stripMargin
+
+    val genFunction = codeGenerator.generateFunction(
+      ruleDescription,
+      classOf[FlatJoinFunction[Any, Any, Any]],
+      joinMethodBody,
+      returnType)
+
+    if (firstIsSingle) {
+      new MapJoinRightRunner[Any, Any, Any](
+        genFunction.name,
+        genFunction.code,
+        genFunction.returnType,
+        broadcastInputSetName)
+    } else {
+      new MapJoinLeftRunner[Any, Any, Any](
+        genFunction.name,
+        genFunction.code,
+        genFunction.returnType,
+        broadcastInputSetName)
+    }
+  }
+
+  private def getMapOperatorName: String = {
+    s"where: ($joinConditionToString), join: ($joinSelectionToString)"
+  }
+
+  private def joinSelectionToString: String = {
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+  private def joinConditionToString: String = {
+    val inFields = joinRowType.getFieldNames.asScala.toList
+    getExpressionString(joinCondition, inFields, None)
+  }
+
+  private def joinTypeToString: String = {
+    "NestedLoopJoin"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 26c025e..9e20df4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -101,6 +101,7 @@ object FlinkRuleSets {
     DataSetAggregateWithNullValuesRule.INSTANCE,
     DataSetCalcRule.INSTANCE,
     DataSetJoinRule.INSTANCE,
+    DataSetSingleRowJoinRule.INSTANCE,
     DataSetScanRule.INSTANCE,
     DataSetUnionRule.INSTANCE,
     DataSetIntersectRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
new file mode 100644
index 0000000..8109fcf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin}
+
+class DataSetSingleRowJoinRule
+  extends ConverterRule(
+      classOf[LogicalJoin],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+    if (isInnerJoin(join)) {
+      isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+        isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+    } else {
+      false
+    }
+  }
+
+  private def isInnerJoin(join: LogicalJoin) = {
+    join.getJoinType == JoinRelType.INNER
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+    node.isInstanceOf[LogicalAggregate] &&
+      isSingleRow(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleRow(agg: LogicalAggregate) = {
+    agg.getGroupSet.isEmpty
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val join = rel.asInstanceOf[LogicalJoin]
+    val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE)
+    val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE)
+    val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+    new DataSetSingleRowJoin(
+      rel.getCluster,
+      traitSet,
+      dataSetLeftNode,
+      dataSetRightNode,
+      leftIsSingle,
+      rel.getRowType,
+      join.getCondition,
+      join.getRowType,
+      description)
+  }
+}
+
+object DataSetSingleRowJoinRule {
+  val INSTANCE: RelOptRule = new DataSetSingleRowJoinRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
new file mode 100644
index 0000000..76650c2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinLeftRunner.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+
+class MapJoinLeftRunner[IN1, IN2, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT],
+    broadcastSetName: String)
+  extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
+
+  override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit =
+    function.join(multiInput, singleInput, out)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
new file mode 100644
index 0000000..52b01cf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapJoinRightRunner.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.util.Collector
+
+class MapJoinRightRunner[IN1, IN2, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT],
+    broadcastSetName: String)
+  extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
+
+  override def flatMap(multiInput: IN2, out: Collector[OUT]): Unit =
+    function.join(singleInput, multiInput, out)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
new file mode 100644
index 0000000..b355d49
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapSideJoinRunner.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT],
+    broadcastSetName: String)
+  extends RichFlatMapFunction[MULTI_IN, OUT]
+    with ResultTypeQueryable[OUT]
+    with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  protected var function: FlatJoinFunction[IN1, IN2, OUT] = _
+  protected var singleInput: SINGLE_IN = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatJoinFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatJoinFunction.")
+    function = clazz.newInstance()
+    singleInput = getRuntimeContext.getBroadcastVariable(broadcastSetName).get(0)
+  }
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
index 6a02fd4..68f63c3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/JoinITCase.scala
@@ -314,4 +314,52 @@ class JoinITCase(
     val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test(expected = classOf[TableException])
+  def testCrossJoin(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val table1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+    val table2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('b1, 'b2, 'b3)
+    tEnv.registerTable("A", table1)
+    tEnv.registerTable("B", table2)
+
+    val sqlQuery = "SELECT a1, b1 FROM A CROSS JOIN B"
+    tEnv.sql(sqlQuery).count
+  }
+
+  @Test
+  def testCrossJoinWithLeftSingleRowInput(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+    tEnv.registerTable("A", table)
+
+    val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
+    val expected =
+      "3,1,1,Hi\n" +
+      "3,2,2,Hello\n" +
+      "3,3,2,Hello world"
+    val result = tEnv.sql(sqlQuery2).collect()
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testCrossJoinWithRightSingleRowInput(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
+    tEnv.registerTable("A", table)
+
+    val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A)"
+    val expected =
+      "1,1,Hi,3\n" +
+      "2,2,Hello,3\n" +
+      "3,2,Hello world,3"
+    val result = tEnv.sql(sqlQuery1).collect()
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe70732/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
new file mode 100644
index 0000000..f56b9ae
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+class SingleRowJoinTest extends TableTestBase {
+
+  @Test
+  def testSingleRowEquiJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, a2 " +
+      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+      "WHERE a1 = cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        binaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                batchTableNode(0),
+                tuples(List(null, null)),
+                term("values", "a1", "a2")
+              ),
+              term("union","a1","a2")
+            ),
+            term("select", "COUNT(a1) AS cnt")
+          ),
+          term("where", "true"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopJoin")
+        ),
+        term("select", "a1", "a2"),
+        term("where", "=(CAST(a1), cnt)")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowNotEquiJoin(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, String)]("A", 'a1, 'a2)
+
+    val query =
+      "SELECT a1, a2 " +
+      "FROM A, (SELECT count(a1) AS cnt FROM A) " +
+      "WHERE a1 < cnt"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        binaryNode(
+          "DataSetSingleRowJoin",
+          batchTableNode(0),
+          unaryNode(
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetUnion",
+              unaryNode(
+                "DataSetValues",
+                batchTableNode(0),
+                tuples(List(null, null)),
+                term("values", "a1", "a2")
+              ),
+              term("union","a1","a2")
+            ),
+            term("select", "COUNT(a1) AS cnt")
+          ),
+          term("where", "true"),
+          term("join", "a1", "a2", "cnt"),
+          term("joinType", "NestedLoopJoin")
+        ),
+        term("select", "a1", "a2"),
+        term("where", "<(a1, cnt)")
+      )
+
+    util.verifySql(query, expected)
+  }
+
+  @Test
+  def testSingleRowJoinWithComplexPredicate(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long)]("A", 'a1, 'a2)
+    util.addTable[(Int, Long)]("B", 'b1, 'b2)
+
+    val query =
+      "SELECT a1, a2, b1, b2 " +
+        "FROM A, (SELECT min(b1) AS b1, max(b2) AS b2 FROM B) " +
+        "WHERE a1 < b1 AND a2 = b2"
+
+    val expected = binaryNode(
+      "DataSetSingleRowJoin",
+      batchTableNode(0),
+      unaryNode(
+        "DataSetAggregate",
+        unaryNode(
+          "DataSetUnion",
+          unaryNode(
+            "DataSetValues",
+            batchTableNode(1),
+            tuples(List(null, null)),
+            term("values", "b1", "b2")
+          ),
+          term("union","b1","b2")
+        ),
+        term("select", "MIN(b1) AS b1", "MAX(b2) AS b2")
+      ),
+      term("where", "AND(<(a1, b1)", "=(a2, b2))"),
+      term("join", "a1", "a2", "b1", "b2"),
+      term("joinType", "NestedLoopJoin")
+    )
+
+    util.verifySql(query, expected)
+  }
+}