You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:48 UTC

[19/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
new file mode 100644
index 0000000..9902486
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+    window: LogicalWindow,
+    namedProperties: Seq[NamedWindowProperty],
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    rowRelDataType: RelDataType,
+    inputType: RelDataType,
+    grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamAggregate(
+      window,
+      namedProperties,
+      cluster,
+      traitSet,
+      inputs.get(0),
+      namedAggregates,
+      getRowType,
+      inputType,
+      grouping)
+  }
+
+  override def toString: String = {
+    s"Aggregate(${
+      if (!grouping.isEmpty) {
+        s"groupBy: (${groupingToString(inputType, grouping)}), "
+      } else {
+        ""
+      }
+    }window: ($window), " +
+      s"select: (${
+        aggregationToString(
+          inputType,
+          grouping,
+          getRowType,
+          namedAggregates,
+          namedProperties)
+      }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+      .item("window", window)
+      .item("select", aggregationToString(
+        inputType,
+        grouping,
+        getRowType,
+        namedAggregates,
+        namedProperties))
+  }
+
+  override def translateToPlan(
+    tableEnv: StreamTableEnvironment,
+    expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+    val config = tableEnv.getConfig
+    val groupingKeys = grouping.indices.toArray
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
+      tableEnv,
+      // tell the input operator that this operator currently only supports Rows as input
+      Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+    // get the output types
+    val fieldTypes: Array[TypeInformation[_]] =
+      getRowType.getFieldList.asScala
+      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+      .toArray
+
+    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+
+    val aggString = aggregationToString(
+      inputType,
+      grouping,
+      getRowType,
+      namedAggregates,
+      namedProperties)
+
+    val prepareOpName = s"prepare select: ($aggString)"
+    val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+      s"window: ($window), " +
+      s"select: ($aggString)"
+    val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+    val mapFunction = AggregateUtil.createPrepareMapFunction(
+      namedAggregates,
+      grouping,
+      inputType)
+
+    val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
+
+    val result: DataStream[Any] = {
+      // check whether all aggregates support partial aggregate
+      if (AggregateUtil.doAllSupportPartialAggregation(
+            namedAggregates.map(_.getKey),
+            inputType,
+            grouping.length)) {
+        // do Incremental Aggregation
+        val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+          namedAggregates,
+          inputType,
+          getRowType,
+          grouping)
+        // grouped / keyed aggregation
+        if (groupingKeys.length > 0) {
+          val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+          val windowedStream =
+            createKeyedWindowedStream(window, keyedStream)
+            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+          windowedStream
+          .apply(reduceFunction, windowFunction)
+          .returns(rowTypeInfo)
+          .name(keyedAggOpName)
+          .asInstanceOf[DataStream[Any]]
+        }
+        // global / non-keyed aggregation
+        else {
+          val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val windowedStream =
+            createNonKeyedWindowedStream(window, mappedInput)
+            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+          windowedStream
+          .apply(reduceFunction, windowFunction)
+          .returns(rowTypeInfo)
+          .name(nonKeyedAggOpName)
+          .asInstanceOf[DataStream[Any]]
+        }
+      }
+      else {
+        // do non-Incremental Aggregation
+        // grouped / keyed aggregation
+        if (groupingKeys.length > 0) {
+
+          val windowFunction = AggregateUtil.createWindowAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+          val windowedStream =
+            createKeyedWindowedStream(window, keyedStream)
+            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+          windowedStream
+          .apply(windowFunction)
+          .returns(rowTypeInfo)
+          .name(keyedAggOpName)
+          .asInstanceOf[DataStream[Any]]
+        }
+        // global / non-keyed aggregation
+        else {
+          val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val windowedStream =
+            createNonKeyedWindowedStream(window, mappedInput)
+            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+          windowedStream
+          .apply(windowFunction)
+          .returns(rowTypeInfo)
+          .name(nonKeyedAggOpName)
+          .asInstanceOf[DataStream[Any]]
+        }
+      }
+    }
+    // if the expected type is not a Row, inject a mapper to convert to the expected type
+    expectedType match {
+      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+        result.map(getConversionMapper(
+          config = config,
+          nullableInput = false,
+          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+          expectedType = expectedType.get,
+          conversionOperatorName = "DataStreamAggregateConversion",
+          fieldNames = getRowType.getFieldNames.asScala
+        ))
+          .name(mapName)
+      case _ => result
+    }
+  }
+}
+object DataStreamAggregate {
+
+
+  private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
+    : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+
+    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+      stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+
+    case ProcessingTimeTumblingGroupWindow(_, size) =>
+      stream.countWindow(asCount(size))
+
+    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+      stream.window(TumblingEventTimeWindows.of(asTime(size)))
+
+    case EventTimeTumblingGroupWindow(_, _, size) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+        "currently not supported.")
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+      stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+      stream.countWindow(asCount(size), asCount(slide))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+      stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+        "currently not supported.")
+
+    case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
+      stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+    case EventTimeSessionGroupWindow(_, _, gap) =>
+      stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
+  }
+
+  private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
+    : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+
+    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+      stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
+
+    case ProcessingTimeTumblingGroupWindow(_, size) =>
+      stream.countWindowAll(asCount(size))
+
+    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+      stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
+
+    case EventTimeTumblingGroupWindow(_, _, size) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+        "currently not supported.")
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+      stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+      stream.countWindowAll(asCount(size), asCount(slide))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+      stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+        "currently not supported.")
+
+    case ProcessingTimeSessionGroupWindow(_, gap) =>
+      stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+    case EventTimeSessionGroupWindow(_, _, gap) =>
+      stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
+  }
+
+  def asTime(expr: Expression): Time = expr match {
+    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
+    case _ => throw new IllegalArgumentException()
+  }
+
+  def asCount(expr: Expression): Long = expr match {
+    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+    case _ => throw new IllegalArgumentException()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
new file mode 100644
index 0000000..774c17b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.FlinkCalc
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/**
+  * Flink RelNode which matches along with FlatMapOperator.
+  *
+  */
+class DataStreamCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowRelDataType: RelDataType,
+    calcProgram: RexProgram,
+    ruleDescription: String)
+  extends SingleRel(cluster, traitSet, input)
+  with FlinkCalc
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamCalc(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      getRowType,
+      calcProgram,
+      ruleDescription
+    )
+  }
+
+  override def toString: String = calcToString(calcProgram, getExpressionString)
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .item("select", selectionToString(calcProgram, getExpressionString))
+      .itemIf("where",
+        conditionToString(calcProgram, getExpressionString),
+        calcProgram.getCondition != null)
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val generator = new CodeGenerator(config, false, inputDataStream.getType)
+
+    val body = functionBody(
+      generator,
+      inputDataStream.getType,
+      getRowType,
+      calcProgram,
+      config,
+      expectedType)
+
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Any, Any]],
+      body,
+      returnType)
+
+    val mapFunc = calcMapFunction(genFunction)
+    inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala
new file mode 100644
index 0000000..9525d1f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamConvention.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+
+class DataStreamConvention extends Convention {
+
+  override def toString: String = getName
+
+  override def useAbstractConvertersForConversion(
+    fromTraits: RelTraitSet,
+    toTraits: RelTraitSet): Boolean = false
+
+  override def canConvertConvention(toConvention: Convention): Boolean = false
+
+  def getInterface: Class[_] = classOf[DataStreamRel]
+
+  def getName: String = "DATASTREAM"
+
+  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
+
+  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
+
+  def register(planner: RelOptPlanner): Unit = { }
+}
+
+object DataStreamConvention {
+
+  val INSTANCE = new DataStreamConvention
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
new file mode 100644
index 0000000..a2d167b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/**
+  * Flink RelNode which matches along with join a user defined table function.
+  */
+class DataStreamCorrelate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    scan: LogicalTableFunctionScan,
+    condition: Option[RexNode],
+    relRowType: RelDataType,
+    joinRowType: RelDataType,
+    joinType: SemiJoinType,
+    ruleDescription: String)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkCorrelate
+  with DataStreamRel {
+
+  override def deriveRowType() = relRowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamCorrelate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      scan,
+      condition,
+      relRowType,
+      joinRowType,
+      joinType,
+      ruleDescription)
+  }
+
+  override def toString: String = {
+    val rexCall = scan.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+    correlateToString(rexCall, sqlFunction)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    val rexCall = scan.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+    super.explainTerms(pw)
+      .item("invocation", scan.getCall)
+      .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
+      .item("rowType", relRowType)
+      .item("joinType", joinType)
+      .itemIf("condition", condition.orNull, condition.isDefined)
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      expectedType: Option[TypeInformation[Any]])
+    : DataStream[Any] = {
+
+    val config = tableEnv.getConfig
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    // we do not need to specify input type
+    val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+    val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
+    val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
+
+    val generator = new CodeGenerator(
+      config,
+      false,
+      inputDS.getType,
+      Some(udtfTypeInfo),
+      None,
+      Some(pojoFieldMapping))
+
+    val body = functionBody(
+      generator,
+      udtfTypeInfo,
+      getRowType,
+      rexCall,
+      condition,
+      config,
+      joinType,
+      expectedType)
+
+    val genFunction = generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Any, Any]],
+      body,
+      returnType)
+
+    val mapFunc = correlateMapFunction(genFunction)
+
+    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
new file mode 100644
index 0000000..16427b8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.nodes.FlinkRel
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+trait DataStreamRel extends RelNode with FlinkRel {
+
+  /**
+    * Translates the FlinkRelNode into a Flink operator.
+    *
+    * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
+    * @param expectedType specifies the type the Flink operator should return. The type must
+    *                     have the same arity as the result. For instance, if the
+    *                     expected type is a RowTypeInfo this method will return a DataSet of
+    *                     type Row. If the expected type is Tuple2, the operator will return
+    *                     a Tuple2 if possible. Row otherwise.
+    * @return DataStream of type expectedType or RowTypeInfo
+    */
+  def translateToPlan(
+    tableEnv: StreamTableEnvironment,
+    expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
new file mode 100644
index 0000000..2d5ec09
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/**
+  * Flink RelNode which matches along with DataStreamSource.
+  * It ensures that types without deterministic field order (e.g. POJOs) are not part of
+  * the plan translation.
+  */
+class DataStreamScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    rowRelDataType: RelDataType)
+  extends StreamScan(cluster, traitSet, table) {
+
+  val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamScan(
+      cluster,
+      traitSet,
+      getTable,
+      getRowType
+    )
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+    val config = tableEnv.getConfig
+    val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
+
+    convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
new file mode 100644
index 0000000..beb15d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with Union.
+  *
+  */
+class DataStreamUnion(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamUnion(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      getRowType
+    )
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("union", unionSelectionToString)
+  }
+
+  override def toString = {
+    s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    leftDataSet.union(rightDataSet)
+  }
+
+  private def unionSelectionToString: String = {
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
new file mode 100644
index 0000000..f2a3d72
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+import scala.collection.JavaConverters._
+
+/**
+  * DataStream RelNode for LogicalValues.
+  */
+class DataStreamValues(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    rowRelDataType: RelDataType,
+    tuples: ImmutableList[ImmutableList[RexLiteral]],
+    ruleDescription: String)
+  extends Values(cluster, rowRelDataType, tuples, traitSet)
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamValues(
+      cluster,
+      traitSet,
+      getRowType,
+      getTuples,
+      ruleDescription
+    )
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      expectedType: Option[TypeInformation[Any]])
+    : DataStream[Any] = {
+
+    val config = tableEnv.getConfig
+
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+
+    val generator = new CodeGenerator(config)
+
+    // generate code for every record
+    val generatedRecords = getTuples.asScala.map { r =>
+      generator.generateResultExpression(
+        returnType,
+        getRowType.getFieldNames.asScala,
+        r.asScala)
+    }
+
+    // generate input format
+    val generatedFunction = generator.generateValuesInputFormat(
+      ruleDescription,
+      generatedRecords.map(_.code),
+      returnType)
+
+    val inputFormat = new ValuesInputFormat[Any](
+      generatedFunction.name,
+      generatedFunction.code,
+      generatedFunction.returnType)
+
+    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
new file mode 100644
index 0000000..ddac958
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.TableConfig
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+abstract class StreamScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable)
+  extends TableScan(cluster, traitSet, table)
+  with DataStreamRel {
+
+  protected def convertToExpectedType(
+      input: DataStream[Any],
+      flinkTable: FlinkTable[_],
+      expectedType: Option[TypeInformation[Any]],
+      config: TableConfig): DataStream[Any] = {
+
+    val inputType = input.getType
+
+    expectedType match {
+
+      // special case:
+      // if efficient type usage is enabled and no expected type is set
+      // we can simply forward the DataSet to the next operator.
+      // however, we cannot forward PojoTypes as their fields don't have an order
+      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
+        input
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != inputType) {
+          val generator = new CodeGenerator(
+            config,
+            nullableInput = false,
+            input.getType,
+            flinkTable.fieldIndexes)
+
+          val conversion = generator.generateConverterResultExpression(
+            determinedType,
+            getRowType.getFieldNames)
+
+          val body =
+            s"""
+               |${conversion.code}
+               |return ${conversion.resultTerm};
+               |""".stripMargin
+
+          val genFunction = generator.generateFunction(
+            "DataSetSourceConversion",
+            classOf[MapFunction[Any, Any]],
+            body,
+            determinedType)
+
+          val mapFunc = new MapRunner[Any, Any](
+            genFunction.name,
+            genFunction.code,
+            genFunction.returnType)
+
+          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          input.map(mapFunc).name(opName)
+        }
+        // no conversion necessary, forward
+        else {
+          input
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
new file mode 100644
index 0000000..f86a54b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+
+/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
+class StreamTableSourceScan(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    table: RelOptTable,
+    tableSource: StreamTableSource[_])
+  extends StreamScan(cluster, traitSet, table) {
+
+  override def deriveRowType() = {
+    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new StreamTableSourceScan(
+      cluster,
+      traitSet,
+      getTable,
+      tableSource
+    )
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+
+    val config = tableEnv.getConfig
+    val inputDataStream: DataStream[Any] = tableSource
+      .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
+
+    convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala
new file mode 100644
index 0000000..86b8a23
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/EnumerableToLogicalTableScan.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts an EnumerableTableScan into a LogicalTableScan.
+ * We need this rule because Calcite creates an EnumerableTableScan
+ * when parsing a SQL query. We convert it into a LogicalTableScan
+ * so we can merge the optimization process with any plan that might be created
+ * by the Table API.
+ */
+class EnumerableToLogicalTableScan(
+    operand: RelOptRuleOperand,
+    description: String) extends RelOptRule(operand, description) {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
+    val table = oldRel.getTable
+    val newRel = LogicalTableScan.create(oldRel.getCluster, table)
+    call.transformTo(newRel)
+  }
+}
+
+object EnumerableToLogicalTableScan {
+  val INSTANCE = new EnumerableToLogicalTableScan(
+      operand(classOf[EnumerableTableScan], any),
+    "EnumerableToLogicalTableScan")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
new file mode 100644
index 0000000..bcd12a4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules
+
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.{RuleSets, RuleSet}
+import org.apache.flink.table.plan.rules.dataSet._
+import org.apache.flink.table.plan.rules.datastream._
+import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
+
+object FlinkRuleSets {
+
+  /**
+    * RuleSet to optimize plans for batch / DataSet execution
+    */
+  val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
+
+    // convert a logical table scan to a relational expression
+    TableScanRule.INSTANCE,
+    EnumerableToLogicalTableScan.INSTANCE,
+
+    // push a filter into a join
+    FilterJoinRule.FILTER_ON_JOIN,
+    // push filter into the children of a join
+    FilterJoinRule.JOIN,
+    // push filter through an aggregation
+    FilterAggregateTransposeRule.INSTANCE,
+
+    // aggregation and projection rules
+    AggregateProjectMergeRule.INSTANCE,
+    AggregateProjectPullUpConstantsRule.INSTANCE,
+    // push a projection past a filter or vice versa
+    ProjectFilterTransposeRule.INSTANCE,
+    FilterProjectTransposeRule.INSTANCE,
+    // push a projection to the children of a join
+    ProjectJoinTransposeRule.INSTANCE,
+    // remove identity project
+    ProjectRemoveRule.INSTANCE,
+    // reorder sort and projection
+    SortProjectTransposeRule.INSTANCE,
+    ProjectSortTransposeRule.INSTANCE,
+
+    // join rules
+    JoinPushExpressionsRule.INSTANCE,
+
+    // remove union with only a single child
+    UnionEliminatorRule.INSTANCE,
+    // convert non-all union into all-union + distinct
+    UnionToDistinctRule.INSTANCE,
+
+    // remove aggregation if it does not aggregate and input is already distinct
+    AggregateRemoveRule.INSTANCE,
+    // push aggregate through join
+    AggregateJoinTransposeRule.EXTENDED,
+    // aggregate union rule
+    AggregateUnionAggregateRule.INSTANCE,
+
+    // remove unnecessary sort rule
+    SortRemoveRule.INSTANCE,
+
+    // simplify expressions rules
+    ReduceExpressionsRule.FILTER_INSTANCE,
+    ReduceExpressionsRule.PROJECT_INSTANCE,
+    ReduceExpressionsRule.CALC_INSTANCE,
+    ReduceExpressionsRule.JOIN_INSTANCE,
+
+    // prune empty results rules
+    PruneEmptyRules.AGGREGATE_INSTANCE,
+    PruneEmptyRules.FILTER_INSTANCE,
+    PruneEmptyRules.JOIN_LEFT_INSTANCE,
+    PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+    PruneEmptyRules.PROJECT_INSTANCE,
+    PruneEmptyRules.SORT_INSTANCE,
+    PruneEmptyRules.UNION_INSTANCE,
+
+    // calc rules
+    FilterCalcMergeRule.INSTANCE,
+    ProjectCalcMergeRule.INSTANCE,
+    FilterToCalcRule.INSTANCE,
+    ProjectToCalcRule.INSTANCE,
+    CalcMergeRule.INSTANCE,
+
+    // translate to Flink DataSet nodes
+    DataSetAggregateRule.INSTANCE,
+    DataSetAggregateWithNullValuesRule.INSTANCE,
+    DataSetCalcRule.INSTANCE,
+    DataSetJoinRule.INSTANCE,
+    DataSetSingleRowJoinRule.INSTANCE,
+    DataSetScanRule.INSTANCE,
+    DataSetUnionRule.INSTANCE,
+    DataSetIntersectRule.INSTANCE,
+    DataSetMinusRule.INSTANCE,
+    DataSetSortRule.INSTANCE,
+    DataSetValuesRule.INSTANCE,
+    DataSetCorrelateRule.INSTANCE,
+    BatchTableSourceScanRule.INSTANCE,
+    // project pushdown optimization
+    PushProjectIntoBatchTableSourceScanRule.INSTANCE
+  )
+
+  /**
+  * RuleSet to optimize plans for stream / DataStream execution
+  */
+  val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
+
+      // convert a logical table scan to a relational expression
+      TableScanRule.INSTANCE,
+      EnumerableToLogicalTableScan.INSTANCE,
+
+      // calc rules
+      FilterToCalcRule.INSTANCE,
+      ProjectToCalcRule.INSTANCE,
+      FilterCalcMergeRule.INSTANCE,
+      ProjectCalcMergeRule.INSTANCE,
+      CalcMergeRule.INSTANCE,
+
+      // prune empty results rules
+      PruneEmptyRules.FILTER_INSTANCE,
+      PruneEmptyRules.PROJECT_INSTANCE,
+      PruneEmptyRules.UNION_INSTANCE,
+
+      // push and merge projection rules
+      ProjectFilterTransposeRule.INSTANCE,
+      FilterProjectTransposeRule.INSTANCE,
+      ProjectRemoveRule.INSTANCE,
+
+      // simplify expressions rules
+      ReduceExpressionsRule.FILTER_INSTANCE,
+      ReduceExpressionsRule.PROJECT_INSTANCE,
+      ReduceExpressionsRule.CALC_INSTANCE,
+
+      // merge and push unions rules
+      UnionEliminatorRule.INSTANCE,
+
+      // translate to DataStream nodes
+      DataStreamAggregateRule.INSTANCE,
+      DataStreamCalcRule.INSTANCE,
+      DataStreamScanRule.INSTANCE,
+      DataStreamUnionRule.INSTANCE,
+      DataStreamValuesRule.INSTANCE,
+      DataStreamCorrelateRule.INSTANCE,
+      StreamTableSourceScanRule.INSTANCE
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
new file mode 100644
index 0000000..d699585
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetConvention}
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.BatchTableSource
+
+/** Rule to convert a [[LogicalTableScan]] into a [[BatchTableSourceScan]]. */
+class BatchTableSourceScanRule
+  extends ConverterRule(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "BatchTableSourceScanRule")
+  {
+
+  /** Rule must only match if TableScan targets a [[BatchTableSource]] */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    dataSetTable match {
+      case tst: TableSourceTable =>
+        tst.tableSource match {
+          case _: BatchTableSource[_] =>
+            true
+          case _ =>
+            false
+        }
+      case _ =>
+        false
+    }
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val scan: TableScan = rel.asInstanceOf[TableScan]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+    val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource
+      .asInstanceOf[BatchTableSource[_]]
+    new BatchTableSourceScan(
+      rel.getCluster,
+      traitSet,
+      scan.getTable,
+      tableSource
+    )
+  }
+}
+
+object BatchTableSourceScanRule {
+  val INSTANCE: RelOptRule = new BatchTableSourceScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
new file mode 100644
index 0000000..d634a6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
+
+import scala.collection.JavaConversions._
+
+class DataSetAggregateRule
+  extends ConverterRule(
+      classOf[LogicalAggregate],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetAggregateRule")
+  {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    //for non grouped agg sets should attach null row to source data
+    //need apply DataSetAggregateWithNullValuesRule
+    if (agg.getGroupSet.isEmpty) {
+      return false
+    }
+
+    // check if we have distinct aggregates
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    if (distinctAggs) {
+      throw TableException("DISTINCT aggregates are currently not supported.")
+    }
+
+    // check if we have grouping sets
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+    if (groupSets || agg.indicator) {
+      throw TableException("GROUPING SETS are currently not supported.")
+    }
+
+    !distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
+
+    new DataSetAggregate(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray)
+    }
+  }
+
+object DataSetAggregateRule {
+  val INSTANCE: RelOptRule = new DataSetAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
new file mode 100644
index 0000000..b708af4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan._
+
+import scala.collection.JavaConversions._
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalUnion, LogicalValues}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
+
+/**
+  * Rule for insert [[org.apache.flink.types.Row]] with null records into a [[DataSetAggregate]]
+  * Rule apply for non grouped aggregate query
+  */
+class DataSetAggregateWithNullValuesRule
+  extends ConverterRule(
+    classOf[LogicalAggregate],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetAggregateWithNullValuesRule")
+{
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    //for grouped agg sets shouldn't attach of null row
+    //need apply other rules. e.g. [[DataSetAggregateRule]]
+    if (!agg.getGroupSet.isEmpty) {
+      return false
+    }
+
+    // check if we have distinct aggregates
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    if (distinctAggs) {
+      throw TableException("DISTINCT aggregates are currently not supported.")
+    }
+
+    // check if we have grouping sets
+    val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet
+    if (groupSets || agg.indicator) {
+      throw TableException("GROUPING SETS are currently not supported.")
+    }
+    !distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val cluster: RelOptCluster = rel.getCluster
+
+    val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
+    val nullLiterals: ImmutableList[ImmutableList[RexLiteral]] =
+      ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+        for (fieldType <- fieldTypes)
+          yield {
+            cluster.getRexBuilder.
+              makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
+          }))
+
+    val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
+    val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true)
+
+    new DataSetAggregate(
+      cluster,
+      traitSet,
+      RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE),
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray
+    )
+  }
+}
+
+object DataSetAggregateWithNullValuesRule {
+  val INSTANCE: RelOptRule = new DataSetAggregateWithNullValuesRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala
new file mode 100644
index 0000000..1d100fa
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCalcRule.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
+
+class DataSetCalcRule
+  extends ConverterRule(
+      classOf[LogicalCalc],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetCalcRule")
+  {
+
+    def convert(rel: RelNode): RelNode = {
+      val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
+
+      new DataSetCalc(
+        rel.getCluster,
+        traitSet,
+        convInput,
+        rel.getRowType,
+        calc.getProgram,
+        description)
+    }
+  }
+
+object DataSetCalcRule {
+  val INSTANCE: RelOptRule = new DataSetCalcRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
new file mode 100644
index 0000000..819bcae
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetCorrelateRule.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate}
+
+/**
+  * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
+  */
+class DataSetCorrelateRule
+  extends ConverterRule(
+      classOf[LogicalCorrelate],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetCorrelateRule") {
+
+    override def matches(call: RelOptRuleCall): Boolean = {
+      val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+      val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+
+      right match {
+        // right node is a table function
+        case scan: LogicalTableFunctionScan => true
+        // a filter is pushed above the table function
+        case filter: LogicalFilter =>
+          filter
+            .getInput.asInstanceOf[RelSubset]
+            .getOriginal
+            .isInstanceOf[LogicalTableFunctionScan]
+        case _ => false
+      }
+    }
+
+    override def convert(rel: RelNode): RelNode = {
+      val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
+      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+      val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
+      val right: RelNode = join.getInput(1)
+
+      def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataSetCorrelate = {
+        relNode match {
+          case rel: RelSubset =>
+            convertToCorrelate(rel.getRelList.get(0), condition)
+
+          case filter: LogicalFilter =>
+            convertToCorrelate(
+              filter.getInput.asInstanceOf[RelSubset].getOriginal,
+              Some(filter.getCondition))
+
+          case scan: LogicalTableFunctionScan =>
+            new DataSetCorrelate(
+              rel.getCluster,
+              traitSet,
+              convInput,
+              scan,
+              condition,
+              rel.getRowType,
+              join.getRowType,
+              join.getJoinType,
+              description)
+        }
+      }
+      convertToCorrelate(right, None)
+    }
+  }
+
+object DataSetCorrelateRule {
+  val INSTANCE: RelOptRule = new DataSetCorrelateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala
new file mode 100644
index 0000000..d158f34
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetIntersectRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalIntersect
+import org.apache.flink.table.plan.nodes.dataset.{DataSetIntersect, DataSetConvention}
+
+class DataSetIntersectRule
+  extends ConverterRule(
+    classOf[LogicalIntersect],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetIntersectRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), DataSetConvention.INSTANCE)
+
+    new DataSetIntersect(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType,
+      intersect.all)
+  }
+}
+
+object DataSetIntersectRule {
+  val INSTANCE: RelOptRule = new DataSetIntersectRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
new file mode 100644
index 0000000..2874198
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetJoinRule.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalJoin
+
+import org.apache.flink.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
+
+import scala.collection.JavaConversions._
+
+class DataSetJoinRule
+  extends ConverterRule(
+      classOf[LogicalJoin],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
+
+    val joinInfo = join.analyzeCondition
+
+    // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
+    !joinInfo.pairs().isEmpty
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+
+    val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
+    val joinInfo = join.analyzeCondition
+
+    new DataSetJoin(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType,
+      join.getCondition,
+      join.getRowType,
+      joinInfo,
+      joinInfo.pairs.toList,
+      join.getJoinType,
+      null,
+      description)
+  }
+
+}
+
+object DataSetJoinRule {
+  val INSTANCE: RelOptRule = new DataSetJoinRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala
new file mode 100644
index 0000000..7172596
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetMinusRule.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalMinus
+import org.apache.calcite.rel.rules.UnionToDistinctRule
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus}
+
+class DataSetMinusRule
+  extends ConverterRule(
+    classOf[LogicalMinus],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetMinusRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE)
+
+    new DataSetMinus(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType,
+      minus.all)
+  }
+}
+
+object DataSetMinusRule {
+  val INSTANCE: RelOptRule = new DataSetMinusRule
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala
new file mode 100644
index 0000000..9d593aa
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetScanRule.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetScan}
+import org.apache.flink.table.plan.schema.DataSetTable
+
+class DataSetScanRule
+  extends ConverterRule(
+      classOf[LogicalTableScan],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetScanRule")
+  {
+
+  /**
+   * If the input is not a DataSetTable, we want the TableScanRule to match instead
+   */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+    val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]])
+    dataSetTable match {
+      case _: DataSetTable[Any] =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val scan: TableScan = rel.asInstanceOf[TableScan]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+
+    new DataSetScan(
+      rel.getCluster,
+      traitSet,
+      scan.getTable,
+      rel.getRowType
+    )
+  }
+}
+
+object DataSetScanRule {
+  val INSTANCE: RelOptRule = new DataSetScanRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
new file mode 100644
index 0000000..1f5c91a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin}
+
+class DataSetSingleRowJoinRule
+  extends ConverterRule(
+      classOf[LogicalJoin],
+      Convention.NONE,
+      DataSetConvention.INSTANCE,
+      "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+    if (isInnerJoin(join)) {
+      isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+        isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+    } else {
+      false
+    }
+  }
+
+  private def isInnerJoin(join: LogicalJoin) = {
+    join.getJoinType == JoinRelType.INNER
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+    node.isInstanceOf[LogicalAggregate] &&
+      isSingleRow(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleRow(agg: LogicalAggregate) = {
+    agg.getGroupSet.isEmpty
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val join = rel.asInstanceOf[LogicalJoin]
+    val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE)
+    val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE)
+    val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+    new DataSetSingleRowJoin(
+      rel.getCluster,
+      traitSet,
+      dataSetLeftNode,
+      dataSetRightNode,
+      leftIsSingle,
+      rel.getRowType,
+      join.getCondition,
+      join.getRowType,
+      description)
+  }
+}
+
+object DataSetSingleRowJoinRule {
+  val INSTANCE: RelOptRule = new DataSetSingleRowJoinRule
+}