You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:49 UTC

[20/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
new file mode 100644
index 0000000..fc69493
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.runtime.FlatMapRunner
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.table.api.{TableConfig, TableException}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Join a user-defined table function
+  */
+trait FlinkCorrelate {
+
+  private[flink] def functionBody(
+      generator: CodeGenerator,
+      udtfTypeInfo: TypeInformation[Any],
+      rowType: RelDataType,
+      rexCall: RexCall,
+      condition: Option[RexNode],
+      config: TableConfig,
+      joinType: SemiJoinType,
+      expectedType: Option[TypeInformation[Any]]): String = {
+
+    val returnType = determineReturnType(
+      rowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
+
+    val call = generator.generateExpression(rexCall)
+    var body =
+      s"""
+         |${call.code}
+         |java.util.Iterator iter = ${call.resultTerm}.getRowsIterator();
+       """.stripMargin
+
+    if (joinType == SemiJoinType.INNER) {
+      // cross join
+      body +=
+        s"""
+           |if (!iter.hasNext()) {
+           |  return;
+           |}
+        """.stripMargin
+    } else if (joinType == SemiJoinType.LEFT) {
+      // left outer join
+
+      // in case of left outer join and the returned row of table function is empty,
+      // fill all fields of row with null
+      val input2NullExprs = input2AccessExprs.map { x =>
+        GeneratedExpression(
+          primitiveDefaultValue(x.resultType),
+          ALWAYS_NULL,
+          NO_CODE,
+          x.resultType)
+      }
+      val outerResultExpr = generator.generateResultExpression(
+        input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
+      body +=
+        s"""
+           |if (!iter.hasNext()) {
+           |  ${outerResultExpr.code}
+           |  ${generator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+           |  return;
+           |}
+        """.stripMargin
+    } else {
+      throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+    }
+
+    val crossResultExpr = generator.generateResultExpression(
+      input1AccessExprs ++ input2AccessExprs,
+      returnType,
+      rowType.getFieldNames.asScala)
+
+    val projection = if (condition.isEmpty) {
+      s"""
+         |${crossResultExpr.code}
+         |${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+       """.stripMargin
+    } else {
+      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
+      filterGenerator.input1Term = filterGenerator.input2Term
+      val filterCondition = filterGenerator.generateExpression(condition.get)
+      s"""
+         |${filterGenerator.reuseInputUnboxingCode()}
+         |${filterCondition.code}
+         |if (${filterCondition.resultTerm}) {
+         |  ${crossResultExpr.code}
+         |  ${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+         |}
+         |""".stripMargin
+    }
+
+    val outputTypeClass = udtfTypeInfo.getTypeClass.getCanonicalName
+    body +=
+      s"""
+         |while (iter.hasNext()) {
+         |  $outputTypeClass ${generator.input2Term} = ($outputTypeClass) iter.next();
+         |  $projection
+         |}
+       """.stripMargin
+    body
+  }
+
+  private[flink] def correlateMapFunction(
+      genFunction: GeneratedFunction[FlatMapFunction[Any, Any]])
+    : FlatMapRunner[Any, Any] = {
+
+    new FlatMapRunner[Any, Any](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+  }
+
+  private[flink] def selectToString(rowType: RelDataType): String = {
+    rowType.getFieldNames.asScala.mkString(",")
+  }
+
+  private[flink] def correlateOpName(
+      rexCall: RexCall,
+      sqlFunction: TableSqlFunction,
+      rowType: RelDataType)
+    : String = {
+
+    s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
+  }
+
+  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
+    val udtfName = sqlFunction.getName
+    val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
+    s"table($udtfName($operands))"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
new file mode 100644
index 0000000..c8211a2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.MapRunner
+
+import scala.collection.JavaConversions._
+
+trait FlinkRel {
+
+  private[flink] def getExpressionString(
+    expr: RexNode,
+    inFields: List[String],
+    localExprsTable: Option[List[RexNode]]): String = {
+
+    expr match {
+      case i: RexInputRef =>
+        inFields.get(i.getIndex)
+
+      case l: RexLiteral =>
+        l.toString
+
+      case l: RexLocalRef if localExprsTable.isEmpty =>
+        throw new IllegalArgumentException("Encountered RexLocalRef without " +
+          "local expression table")
+
+      case l: RexLocalRef =>
+        val lExpr = localExprsTable.get(l.getIndex)
+        getExpressionString(lExpr, inFields, localExprsTable)
+
+      case c: RexCall =>
+        val op = c.getOperator.toString
+        val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
+        s"$op(${ops.mkString(", ")})"
+
+      case fa: RexFieldAccess =>
+        val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
+        val field = fa.getField.getName
+        s"$referenceExpr.$field"
+
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
+    }
+  }
+
+  private[flink] def getConversionMapper(
+      config: TableConfig,
+      nullableInput: Boolean,
+      inputType: TypeInformation[Any],
+      expectedType: TypeInformation[Any],
+      conversionOperatorName: String,
+      fieldNames: Seq[String],
+      inputPojoFieldMapping: Option[Array[Int]] = None)
+    : MapFunction[Any, Any] = {
+
+    val generator = new CodeGenerator(
+      config,
+      nullableInput,
+      inputType,
+      None,
+      inputPojoFieldMapping)
+    val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    val genFunction = generator.generateFunction(
+      conversionOperatorName,
+      classOf[MapFunction[Any, Any]],
+      body,
+      expectedType)
+
+    new MapRunner[Any, Any](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
new file mode 100644
index 0000000..252bb2e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+abstract class BatchScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable)
+  extends TableScan(cluster, traitSet, table)
+  with DataSetRel {
+
+  override def toString: String = {
+    s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+    val rowCnt = metadata.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+  }
+
+  protected def convertToExpectedType(
+      input: DataSet[Any],
+      flinkTable: FlinkTable[_],
+      expectedType: Option[TypeInformation[Any]],
+      config: TableConfig): DataSet[Any] = {
+
+    val inputType = input.getType
+
+    expectedType match {
+
+      // special case:
+      // if efficient type usage is enabled and no expected type is set
+      // we can simply forward the DataSet to the next operator.
+      // however, we cannot forward PojoTypes as their fields don't have an order
+      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
+        input
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != inputType) {
+
+          val mapFunc = getConversionMapper(
+            config,
+            nullableInput = false,
+            inputType,
+            determinedType,
+            "DataSetSourceConversion",
+            getRowType.getFieldNames,
+            Some(flinkTable.fieldIndexes))
+
+          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          input.map(mapFunc).name(opName)
+        }
+        // no conversion necessary, forward
+        else {
+          input
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
new file mode 100644
index 0000000..09cb180
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.BatchTableSource
+
+/** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
+class BatchTableSourceScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    val tableSource: BatchTableSource[_])
+  extends BatchScan(cluster, traitSet, table) {
+
+  override def deriveRowType() = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val rowCnt = metadata.getRowCount(this)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new BatchTableSourceScan(
+      cluster,
+      traitSet,
+      getTable,
+      tableSource
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("fields", tableSource.getFieldsNames.mkString(", "))
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+    val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
+
+    convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
new file mode 100644
index 0000000..a5c42d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.runtime.aggregate.AggregateUtil
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.typeutils.TypeConverter
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with a LogicalAggregate.
+  */
+class DataSetAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    rowRelDataType: RelDataType,
+    inputType: RelDataType,
+    grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      namedAggregates,
+      getRowType,
+      inputType,
+      grouping)
+  }
+
+  override def toString: String = {
+    s"Aggregate(${ if (!grouping.isEmpty) {
+      s"groupBy: (${groupingToString(inputType, grouping)}), "
+    } else {
+      ""
+    }}select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+      .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil))
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+    val rowSize = this.estimateRowSize(child.getRowType)
+    val aggCnt = this.namedAggregates.size
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
+  }
+
+  override def translateToPlan(
+    tableEnv: BatchTableEnvironment,
+    expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val groupingKeys = grouping.indices.toArray
+
+    val mapFunction = AggregateUtil.createPrepareMapFunction(
+      namedAggregates,
+      grouping,
+      inputType)
+
+    val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
+      namedAggregates,
+      inputType,
+      rowRelDataType,
+      grouping)
+
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
+      tableEnv,
+      // tell the input operator that this operator currently only supports Rows as input
+      Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+    // get the output types
+    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
+    .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+    .toArray
+
+    val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
+    val prepareOpName = s"prepare select: ($aggString)"
+    val mappedInput = inputDS
+      .map(mapFunction)
+      .name(prepareOpName)
+
+    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+
+    val result = {
+      if (groupingKeys.length > 0) {
+        // grouped aggregation
+        val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+          s"select: ($aggString)"
+
+        mappedInput.asInstanceOf[DataSet[Row]]
+          .groupBy(groupingKeys: _*)
+          .reduceGroup(groupReduceFunction)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataSet[Any]]
+      }
+      else {
+        // global aggregation
+        val aggOpName = s"select:($aggString)"
+        mappedInput.asInstanceOf[DataSet[Row]]
+          .reduceGroup(groupReduceFunction)
+          .returns(rowTypeInfo)
+          .name(aggOpName)
+          .asInstanceOf[DataSet[Any]]
+      }
+    }
+
+    // if the expected type is not a Row, inject a mapper to convert to the expected type
+    expectedType match {
+      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+        result.map(getConversionMapper(
+          config = config,
+          nullableInput = false,
+          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+          expectedType = expectedType.get,
+          conversionOperatorName = "DataSetAggregateConversion",
+          fieldNames = getRowType.getFieldNames.asScala
+        ))
+        .name(mapName)
+      case _ => result
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
new file mode 100644
index 0000000..03178ad
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.FlinkCalc
+import org.apache.flink.table.typeutils.TypeConverter
+import TypeConverter._
+import org.apache.calcite.rex._
+import org.apache.flink.table.api.BatchTableEnvironment
+
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with LogicalCalc.
+  *
+  */
+class DataSetCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowRelDataType: RelDataType,
+    private[flink] val calcProgram: RexProgram, // for tests
+    ruleDescription: String)
+  extends SingleRel(cluster, traitSet, input)
+  with FlinkCalc
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetCalc(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      getRowType,
+      calcProgram,
+      ruleDescription)
+  }
+
+  override def toString: String = calcToString(calcProgram, getExpressionString)
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("select", selectionToString(calcProgram, getExpressionString))
+      .itemIf("where",
+        conditionToString(calcProgram, getExpressionString),
+        calcProgram.getCondition != null)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+
+    // compute number of expressions that do not access a field or literal, i.e. computations,
+    //   conditions, etc. We only want to account for computations, not for simple projections.
+    val compCnt = calcProgram.getExprList.asScala.toList.count {
+      case i: RexInputRef => false
+      case l: RexLiteral => false
+      case _ => true
+    }
+
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+  }
+
+  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+    val child = this.getInput
+    val rowCnt = metadata.getRowCount(child)
+
+    if (calcProgram.getCondition != null) {
+      // we reduce the result card to push filters down
+      (rowCnt * 0.75).min(1.0)
+    } else {
+      rowCnt
+    }
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val generator = new CodeGenerator(config, false, inputDS.getType)
+
+    val body = functionBody(
+      generator,
+      inputDS.getType,
+      getRowType,
+      calcProgram,
+      config,
+      expectedType)
+
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Any, Any]],
+      body,
+      returnType)
+
+    val mapFunc = calcMapFunction(genFunction)
+    inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala
new file mode 100644
index 0000000..1b7bab5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+
+class DataSetConvention extends Convention {
+
+  override def toString: String = getName
+
+  override def useAbstractConvertersForConversion(
+    fromTraits: RelTraitSet,
+    toTraits: RelTraitSet): Boolean = false
+
+  override def canConvertConvention(toConvention: Convention): Boolean = false
+
+  def getInterface: Class[_] = classOf[DataSetRel]
+
+  def getName: String = "DATASET"
+
+  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
+
+  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
+
+  def register(planner: RelOptPlanner): Unit = { }
+}
+
+object DataSetConvention {
+
+  val INSTANCE = new DataSetConvention
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
new file mode 100644
index 0000000..fa1afc3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.table.typeutils.TypeConverter._
+
+/**
+  * Flink RelNode which matches along with join a user defined table function.
+  */
+class DataSetCorrelate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    scan: LogicalTableFunctionScan,
+    condition: Option[RexNode],
+    relRowType: RelDataType,
+    joinRowType: RelDataType,
+    joinType: SemiJoinType,
+    ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataSetRel {
+
+  override def deriveRowType() = relRowType
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val rowCnt = metadata.getRowCount(getInput) * 1.5
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 0.5)
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetCorrelate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      scan,
+      condition,
+      relRowType,
+      joinRowType,
+      joinType,
+      ruleDescription)
+  }
+
+  override def toString: String = {
+    val rexCall = scan.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+    correlateToString(rexCall, sqlFunction)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val rexCall = scan.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+    super.explainTerms(pw)
+      .item("invocation", scan.getCall)
+      .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
+      .item("rowType", relRowType)
+      .item("joinType", joinType)
+      .itemIf("condition", condition.orNull, condition.isDefined)
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]])
+    : DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    // we do not need to specify input type
+    val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
+    val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
+
+    val generator = new CodeGenerator(
+      config,
+      false,
+      inputDS.getType,
+      Some(udtfTypeInfo),
+      None,
+      Some(pojoFieldMapping))
+
+    val body = functionBody(
+      generator,
+      udtfTypeInfo,
+      getRowType,
+      rexCall,
+      condition,
+      config,
+      joinType,
+      expectedType)
+
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Any, Any]],
+      body,
+      returnType)
+
+    val mapFunc = correlateMapFunction(genFunction)
+
+    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
new file mode 100644
index 0000000..332aa8a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.runtime.IntersectCoGroupFunction
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which translates Intersect into CoGroup Operator.
+  *
+  */
+class DataSetIntersect(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType,
+    all: Boolean)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+    with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetIntersect(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      getRowType,
+      all
+    )
+  }
+
+  override def toString: String = {
+    s"Intersect(intersect: ($intersectSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("intersect", intersectSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val children = this.getInputs
+    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
+      val rowCnt = metadata.getRowCount(child)
+      val rowSize = this.estimateRowSize(child.getRowType)
+      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
+    }
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+    val coGroupOpName = s"intersect: ($intersectSelectionToString)"
+    val coGroupFunction = new IntersectCoGroupFunction[Any](all)
+
+    val intersectDs = coGroupedDs.where("*").equalTo("*")
+      .`with`(coGroupFunction).name(coGroupOpName)
+
+    val config = tableEnv.getConfig
+    val leftType = leftDataSet.getType
+
+    // here we only care about left type information, because we emit records from left DataSet
+    expectedType match {
+      case None if config.getEfficientTypeUsage =>
+        intersectDs
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != leftType) {
+          val mapFunc = getConversionMapper(
+            config,
+            false,
+            leftType,
+            determinedType,
+            "DataSetIntersectConversion",
+            getRowType.getFieldNames)
+
+          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          intersectDs.map(mapFunc).name(opName)
+        }
+        // no conversion necessary, forward
+        else {
+          intersectDs
+        }
+    }
+  }
+
+  private def intersectSelectionToString: String = {
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
new file mode 100644
index 0000000..324e949
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.FlatJoinRunner
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related operations.
+  */
+class DataSetJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType,
+    joinCondition: RexNode,
+    joinRowType: RelDataType,
+    joinInfo: JoinInfo,
+    keyPairs: List[IntPair],
+    joinType: JoinRelType,
+    joinHint: JoinHint,
+    ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetJoin(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      getRowType,
+      joinCondition,
+      joinRowType,
+      joinInfo,
+      keyPairs,
+      joinType,
+      joinHint,
+      ruleDescription)
+  }
+
+  override def toString: String = {
+    s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("where", joinConditionToString)
+      .item("join", joinSelectionToString)
+      .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+    val leftRowCnt = metadata.getRowCount(getLeft)
+    val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+    val rightRowCnt = metadata.getRowCount(getRight)
+    val rightRowSize = estimateRowSize(getRight.getRowType)
+
+    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+    val cpuCost = leftRowCnt + rightRowCnt
+    val rowCnt = leftRowCnt + rightRowCnt
+
+    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    // get the equality keys
+    val leftKeys = ArrayBuffer.empty[Int]
+    val rightKeys = ArrayBuffer.empty[Int]
+    if (keyPairs.isEmpty) {
+      // if no equality keys => not supported
+      throw TableException(
+        "Joins should have at least one equality condition.\n" +
+          s"\tLeft: ${left.toString},\n" +
+          s"\tRight: ${right.toString},\n" +
+          s"\tCondition: ($joinConditionToString)"
+      )
+    }
+    else {
+      // at least one equality expression
+      val leftFields = left.getRowType.getFieldList
+      val rightFields = right.getRowType.getFieldList
+
+      keyPairs.foreach(pair => {
+        val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+        val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+
+        // check if keys are compatible
+        if (leftKeyType == rightKeyType) {
+          // add key pair
+          leftKeys.add(pair.source)
+          rightKeys.add(pair.target)
+        } else {
+          throw TableException(
+            "Equality join predicate on incompatible types.\n" +
+              s"\tLeft: ${left.toString},\n" +
+              s"\tRight: ${right.toString},\n" +
+              s"\tCondition: ($joinConditionToString)"
+          )
+        }
+      })
+    }
+
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val (joinOperator, nullCheck) = joinType match {
+      case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
+      case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
+      case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
+      case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
+    }
+
+    if (nullCheck && !config.getNullCheck) {
+      throw TableException("Null check in TableConfig must be enabled for outer joins.")
+    }
+
+    val generator = new CodeGenerator(
+      config,
+      nullCheck,
+      leftDataSet.getType,
+      Some(rightDataSet.getType))
+    val conversion = generator.generateConverterResultExpression(
+      returnType,
+      joinRowType.getFieldNames)
+
+    var body = ""
+
+    if (joinInfo.isEqui) {
+      // only equality condition
+      body = s"""
+           |${conversion.code}
+           |${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |""".stripMargin
+    }
+    else {
+      val condition = generator.generateExpression(joinCondition)
+      body = s"""
+           |${condition.code}
+           |if (${condition.resultTerm}) {
+           |  ${conversion.code}
+           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |}
+           |""".stripMargin
+    }
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatJoinFunction[Any, Any, Any]],
+      body,
+      returnType)
+
+    val joinFun = new FlatJoinRunner[Any, Any, Any](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+    val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
+
+    joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
+      .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
+  }
+
+  private def joinSelectionToString: String = {
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+  private def joinConditionToString: String = {
+
+    val inFields = joinRowType.getFieldNames.asScala.toList
+    getExpressionString(joinCondition, inFields, None)
+  }
+
+  private def joinTypeToString = joinType match {
+    case JoinRelType.INNER => "InnerJoin"
+    case JoinRelType.LEFT=> "LeftOuterJoin"
+    case JoinRelType.RIGHT => "RightOuterJoin"
+    case JoinRelType.FULL => "FullOuterJoin"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
new file mode 100644
index 0000000..672ff9c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.runtime.MinusCoGroupFunction
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which implements set minus operation.
+  *
+  */
+class DataSetMinus(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType,
+    all: Boolean)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+    with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetMinus(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      getRowType,
+      all
+    )
+  }
+
+  override def toString: String = {
+    s"Minus(minus: ($minusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("minus", minusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val children = this.getInputs
+    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
+      val rowCnt = metadata.getRowCount(child)
+      val rowSize = this.estimateRowSize(child.getRowType)
+      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
+    }
+  }
+
+  override def estimateRowCount(mq: RelMetadataQuery): Double = {
+    // from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount
+    val children = this.getInputs
+    var rowCnt = mq.getRowCount(children.head)
+    getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_))
+    if (rowCnt < 0) {
+      rowCnt = 0.0
+    }
+    rowCnt
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+    val coGroupOpName = s"minus: ($minusSelectionToString)"
+    val coGroupFunction = new MinusCoGroupFunction[Any](all)
+
+    val minusDs = coGroupedDs.where("*").equalTo("*")
+      .`with`(coGroupFunction).name(coGroupOpName)
+
+    val config = tableEnv.getConfig
+    val leftType = leftDataSet.getType
+
+    // here we only care about left type information, because we emit records from left DataSet
+    expectedType match {
+      case None if config.getEfficientTypeUsage =>
+        minusDs
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != leftType) {
+          val mapFunc = getConversionMapper(
+            config = config,
+            nullableInput = false,
+            inputType = leftType,
+            expectedType = determinedType,
+            conversionOperatorName = "DataSetMinusConversion",
+            fieldNames = getRowType.getFieldNames)
+
+          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          minusDs.map(mapFunc).name(opName)
+        }
+        // no conversion necessary, forward
+        else {
+          minusDs
+        }
+    }
+  }
+
+  private def minusSelectionToString: String = {
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
new file mode 100644
index 0000000..210ae03
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.FlinkRel
+
+import scala.collection.JavaConversions._
+
+trait DataSetRel extends RelNode with FlinkRel {
+
+  /**
+    * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
+    *
+    * @param tableEnv     [[BatchTableEnvironment]] of the translated Table.
+    * @param expectedType specifies the type the Flink operator should return. The type must
+    *                     have the same arity as the result. For instance, if the
+    *                     expected type is a RowTypeInfo this method will return a DataSet of
+    *                     type Row. If the expected type is Tuple2, the operator will return
+    *                     a Tuple2 if possible. Row otherwise.
+    * @return DataSet of type expectedType or RowTypeInfo
+    */
+  def translateToPlan(
+     tableEnv: BatchTableEnvironment,
+     expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
+
+  private[flink] def estimateRowSize(rowType: RelDataType): Double = {
+
+    rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
+      t match {
+        case SqlTypeName.TINYINT => s + 1
+        case SqlTypeName.SMALLINT => s + 2
+        case SqlTypeName.INTEGER => s + 4
+        case SqlTypeName.BIGINT => s + 8
+        case SqlTypeName.BOOLEAN => s + 1
+        case SqlTypeName.FLOAT => s + 4
+        case SqlTypeName.DOUBLE => s + 8
+        case SqlTypeName.VARCHAR => s + 12
+        case SqlTypeName.CHAR => s + 1
+        case SqlTypeName.DECIMAL => s + 12
+        case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
+        case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
+        case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
+        case _ => throw TableException(s"Unsupported data type encountered: $t")
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
new file mode 100644
index 0000000..48bbb74
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.plan.schema.DataSetTable
+
+/**
+  * Flink RelNode which matches along with DataSource.
+  * It ensures that types without deterministic field order (e.g. POJOs) are not part of
+  * the plan translation.
+  */
+class DataSetScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    rowRelDataType: RelDataType)
+  extends BatchScan(cluster, traitSet, table) {
+
+  val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetScan(
+      cluster,
+      traitSet,
+      getTable,
+      getRowType
+    )
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+    val inputDataSet: DataSet[Any] = dataSetTable.dataSet
+
+    convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
new file mode 100644
index 0000000..a70b4ab
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode that executes a Join where one of inputs is a single row.
+  */
+class DataSetSingleRowJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    leftIsSingle: Boolean,
+    rowRelDataType: RelDataType,
+    joinCondition: RexNode,
+    joinRowType: RelDataType,
+    ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetSingleRowJoin(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      leftIsSingle,
+      getRowType,
+      joinCondition,
+      joinRowType,
+      ruleDescription)
+  }
+
+  override def toString: String = {
+    s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("where", joinConditionToString)
+      .item("join", joinSelectionToString)
+      .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+    val child = if (leftIsSingle) {
+      this.getRight
+    } else {
+      this.getLeft
+    }
+    val rowCnt = metadata.getRowCount(child)
+    val rowSize = this.estimateRowSize(child.getRowType)
+    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val broadcastSetName = "joinSet"
+    val mapSideJoin = generateMapFunction(
+      tableEnv.getConfig,
+      leftDataSet.getType,
+      rightDataSet.getType,
+      leftIsSingle,
+      joinCondition,
+      broadcastSetName,
+      expectedType)
+
+    val (multiRowDataSet, singleRowDataSet) =
+      if (leftIsSingle) {
+        (rightDataSet, leftDataSet)
+      } else {
+        (leftDataSet, rightDataSet)
+      }
+
+    multiRowDataSet
+      .flatMap(mapSideJoin)
+      .withBroadcastSet(singleRowDataSet, broadcastSetName)
+      .name(getMapOperatorName)
+      .asInstanceOf[DataSet[Any]]
+  }
+
+  private def generateMapFunction(
+      config: TableConfig,
+      inputType1: TypeInformation[Any],
+      inputType2: TypeInformation[Any],
+      firstIsSingle: Boolean,
+      joinCondition: RexNode,
+      broadcastInputSetName: String,
+      expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
+
+    val codeGenerator = new CodeGenerator(
+      config,
+      false,
+      inputType1,
+      Some(inputType2))
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val conversion = codeGenerator.generateConverterResultExpression(
+      returnType,
+      joinRowType.getFieldNames)
+
+    val condition = codeGenerator.generateExpression(joinCondition)
+
+    val joinMethodBody = s"""
+                  |${condition.code}
+                  |if (${condition.resultTerm}) {
+                  |  ${conversion.code}
+                  |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+                  |}
+                  |""".stripMargin
+
+    val genFunction = codeGenerator.generateFunction(
+      ruleDescription,
+      classOf[FlatJoinFunction[Any, Any, Any]],
+      joinMethodBody,
+      returnType)
+
+    if (firstIsSingle) {
+      new MapJoinRightRunner[Any, Any, Any](
+        genFunction.name,
+        genFunction.code,
+        genFunction.returnType,
+        broadcastInputSetName)
+    } else {
+      new MapJoinLeftRunner[Any, Any, Any](
+        genFunction.name,
+        genFunction.code,
+        genFunction.returnType,
+        broadcastInputSetName)
+    }
+  }
+
+  private def getMapOperatorName: String = {
+    s"where: ($joinConditionToString), join: ($joinSelectionToString)"
+  }
+
+  private def joinSelectionToString: String = {
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+  private def joinConditionToString: String = {
+    val inFields = joinRowType.getFieldNames.asScala.toList
+    getExpressionString(joinCondition, inFields, None)
+  }
+
+  private def joinTypeToString: String = {
+    "NestedLoopJoin"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
new file mode 100644
index 0000000..428ea84
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inp: RelNode,
+    collations: RelCollation,
+    rowRelDataType: RelDataType,
+    offset: RexNode,
+    fetch: RexNode)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel {
+
+  private val limitStart: Long = if (offset != null) {
+    RexLiteral.intValue(offset)
+  } else {
+    0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+    RexLiteral.intValue(fetch) + limitStart
+  } else {
+    Long.MaxValue
+  }
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+    new DataSetSort(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      collations,
+      getRowType,
+      offset,
+      fetch
+    )
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]] = None)
+    : DataSet[Any] = {
+
+    if (fieldCollations.isEmpty) {
+      throw TableException("Limiting the result without sorting is not allowed " +
+        "as it could lead to arbitrary results.")
+    }
+
+    val config = tableEnv.getConfig
+
+    val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
+    var partitionedDs = if (currentParallelism == 1) {
+      inputDs
+    } else {
+      inputDs.partitionByRange(fieldCollations.map(_._1): _*)
+        .withOrders(fieldCollations.map(_._2): _*)
+    }
+
+    fieldCollations.foreach { fieldCollation =>
+      partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
+    }
+
+    val limitedDs = if (offset == null && fetch == null) {
+      partitionedDs
+    } else {
+      val countFunction = new CountPartitionFunction[Any]
+
+      val partitionCountName = s"prepare offset/fetch"
+
+      val partitionCount = partitionedDs
+        .mapPartition(countFunction)
+        .name(partitionCountName)
+
+      val broadcastName = "countPartition"
+
+      val limitFunction = new LimitFilterFunction[Any](
+        limitStart,
+        limitEnd,
+        broadcastName)
+
+      val limitName = s"offset: $offsetToString, fetch: $fetchToString"
+
+      partitionedDs
+        .filter(limitFunction)
+        .name(limitName)
+        .withBroadcastSet(partitionCount, broadcastName)
+    }
+
+    val inputType = partitionedDs.getType
+    expectedType match {
+
+      case None if config.getEfficientTypeUsage =>
+        limitedDs
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != inputType) {
+
+          val mapFunc = getConversionMapper(
+            config = config,
+            nullableInput = false,
+            inputType = partitionedDs.getType,
+            expectedType = determinedType,
+            conversionOperatorName = "DataSetSortConversion",
+            fieldNames = getRowType.getFieldNames.asScala
+          )
+
+          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          limitedDs.map(mapFunc).name(opName)
+        }
+        // no conversion necessary, forward
+        else {
+          limitedDs
+        }
+    }
+  }
+
+  private def directionToOrder(direction: Direction) = {
+    direction match {
+      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
+      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
+      case _ => throw new IllegalArgumentException("Unsupported direction.")
+    }
+
+  }
+
+  private val fieldCollations = collations.getFieldCollations.asScala
+    .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+  private val sortFieldsToString = fieldCollations
+    .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
+
+  private val offsetToString = s"$offset"
+
+  private val fetchToString = if (limitEnd == Long.MaxValue) {
+    "unlimited"
+  } else {
+    s"$limitEnd"
+  }
+
+  override def toString: String =
+    s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
+    super.explainTerms(pw)
+      .item("orderBy", sortFieldsToString)
+      .item("offset", offsetToString)
+      .item("fetch", fetchToString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
new file mode 100644
index 0000000..b0c95b5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+* Flink RelNode which matches along with UnionOperator.
+*
+*/
+class DataSetUnion(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetUnion(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      rowRelDataType
+    )
+  }
+
+  override def toString: String = {
+    s"Union(union: ($unionSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("union", unionSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+    val children = this.getInputs
+    val rowCnt = children.foldLeft(0D) { (rows, child) =>
+      rows + metadata.getRowCount(child)
+    }
+
+    planner.getCostFactory.makeCost(rowCnt, 0, 0)
+  }
+
+  override def estimateRowCount(mq: RelMetadataQuery): Double = {
+    // adopted from org.apache.calcite.rel.metadata.RelMdUtil.getUnionAllRowCount
+    getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    var leftDataSet: DataSet[Any] = null
+    var rightDataSet: DataSet[Any] = null
+
+    expectedType match {
+      case None =>
+        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+        rightDataSet =
+          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
+      case _ =>
+        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+    }
+
+    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+  }
+
+  private def unionSelectionToString: String = {
+    rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
new file mode 100644
index 0000000..e0282f2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+/**
+  * DataSet RelNode for a LogicalValues.
+  *
+  */
+class DataSetValues(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    rowRelDataType: RelDataType,
+    tuples: ImmutableList[ImmutableList[RexLiteral]],
+    ruleDescription: String)
+  extends Values(cluster, rowRelDataType, tuples, traitSet)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataSetValues(
+      cluster,
+      traitSet,
+      getRowType,
+      getTuples,
+      ruleDescription
+    )
+  }
+
+  override def toString: String = {
+    s"Values(values: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("values", valuesFieldsToString)
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val generator = new CodeGenerator(config)
+
+    // generate code for every record
+    val generatedRecords = getTuples.asScala.map { r =>
+      generator.generateResultExpression(
+        returnType,
+        getRowType.getFieldNames.asScala,
+        r.asScala)
+    }
+
+    // generate input format
+    val generatedFunction = generator.generateValuesInputFormat(
+      ruleDescription,
+      generatedRecords.map(_.code),
+      returnType)
+
+    val inputFormat = new ValuesInputFormat[Any](
+      generatedFunction.name,
+      generatedFunction.code,
+      generatedFunction.returnType)
+
+    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+  }
+
+  private def valuesFieldsToString: String = {
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+}
+
+