You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:02 UTC

[33/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
deleted file mode 100644
index 7133773..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.table.plan.nodes.FlinkAggregate
-import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
-import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _}
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-
-import scala.collection.JavaConverters._
-
-class DataStreamAggregate(
-    window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty],
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputNode: RelNode,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    rowRelDataType: RelDataType,
-    inputType: RelDataType,
-    grouping: Array[Int])
-  extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
-  with DataStreamRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamAggregate(
-      window,
-      namedProperties,
-      cluster,
-      traitSet,
-      inputs.get(0),
-      namedAggregates,
-      getRowType,
-      inputType,
-      grouping)
-  }
-
-  override def toString: String = {
-    s"Aggregate(${
-      if (!grouping.isEmpty) {
-        s"groupBy: (${groupingToString(inputType, grouping)}), "
-      } else {
-        ""
-      }
-    }window: ($window), " +
-      s"select: (${
-        aggregationToString(
-          inputType,
-          grouping,
-          getRowType,
-          namedAggregates,
-          namedProperties)
-      }))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
-      .item("window", window)
-      .item("select", aggregationToString(
-        inputType,
-        grouping,
-        getRowType,
-        namedAggregates,
-        namedProperties))
-  }
-
-  override def translateToPlan(
-    tableEnv: StreamTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
-    val config = tableEnv.getConfig
-    val groupingKeys = grouping.indices.toArray
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] =
-      getRowType.getFieldList.asScala
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-      .toArray
-
-    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
-    val aggString = aggregationToString(
-      inputType,
-      grouping,
-      getRowType,
-      namedAggregates,
-      namedProperties)
-
-    val prepareOpName = s"prepare select: ($aggString)"
-    val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
-      s"window: ($window), " +
-      s"select: ($aggString)"
-    val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
-
-    val mapFunction = AggregateUtil.createPrepareMapFunction(
-      namedAggregates,
-      grouping,
-      inputType)
-
-    val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
-
-    val result: DataStream[Any] = {
-      // check whether all aggregates support partial aggregate
-      if (AggregateUtil.doAllSupportPartialAggregation(
-            namedAggregates.map(_.getKey),
-            inputType,
-            grouping.length)) {
-        // do Incremental Aggregation
-        val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
-          namedAggregates,
-          inputType,
-          getRowType,
-          grouping)
-        // grouped / keyed aggregation
-        if (groupingKeys.length > 0) {
-          val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-          val windowedStream =
-            createKeyedWindowedStream(window, keyedStream)
-            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-          windowedStream
-          .apply(reduceFunction, windowFunction)
-          .returns(rowTypeInfo)
-          .name(keyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-        // global / non-keyed aggregation
-        else {
-          val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val windowedStream =
-            createNonKeyedWindowedStream(window, mappedInput)
-            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-          windowedStream
-          .apply(reduceFunction, windowFunction)
-          .returns(rowTypeInfo)
-          .name(nonKeyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-      }
-      else {
-        // do non-Incremental Aggregation
-        // grouped / keyed aggregation
-        if (groupingKeys.length > 0) {
-
-          val windowFunction = AggregateUtil.createWindowAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-          val windowedStream =
-            createKeyedWindowedStream(window, keyedStream)
-            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-          windowedStream
-          .apply(windowFunction)
-          .returns(rowTypeInfo)
-          .name(keyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-        // global / non-keyed aggregation
-        else {
-          val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val windowedStream =
-            createNonKeyedWindowedStream(window, mappedInput)
-            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-          windowedStream
-          .apply(windowFunction)
-          .returns(rowTypeInfo)
-          .name(nonKeyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-      }
-    }
-    // if the expected type is not a Row, inject a mapper to convert to the expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-        result.map(getConversionMapper(
-          config = config,
-          nullableInput = false,
-          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
-          expectedType = expectedType.get,
-          conversionOperatorName = "DataStreamAggregateConversion",
-          fieldNames = getRowType.getFieldNames.asScala
-        ))
-          .name(mapName)
-      case _ => result
-    }
-  }
-}
-object DataStreamAggregate {
-
-
-  private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
-    : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
-
-    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
-      stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
-
-    case ProcessingTimeTumblingGroupWindow(_, size) =>
-      stream.countWindow(asCount(size))
-
-    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
-      stream.window(TumblingEventTimeWindows.of(asTime(size)))
-
-    case EventTimeTumblingGroupWindow(_, _, size) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
-        "currently not supported.")
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
-      stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
-      stream.countWindow(asCount(size), asCount(slide))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
-      stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
-        "currently not supported.")
-
-    case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
-      stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
-    case EventTimeSessionGroupWindow(_, _, gap) =>
-      stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
-  }
-
-  private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
-    : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
-
-    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
-      stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
-
-    case ProcessingTimeTumblingGroupWindow(_, size) =>
-      stream.countWindowAll(asCount(size))
-
-    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
-      stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
-
-    case EventTimeTumblingGroupWindow(_, _, size) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
-        "currently not supported.")
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
-      stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
-      stream.countWindowAll(asCount(size), asCount(slide))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
-      stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
-        "currently not supported.")
-
-    case ProcessingTimeSessionGroupWindow(_, gap) =>
-      stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
-    case EventTimeSessionGroupWindow(_, _, gap) =>
-      stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
-  }
-
-  def asTime(expr: Expression): Time = expr match {
-    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
-    case _ => throw new IllegalArgumentException()
-  }
-
-  def asCount(expr: Expression): Long = expr match {
-    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
-    case _ => throw new IllegalArgumentException()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
deleted file mode 100644
index 5312a5f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.FlinkCalc
-import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
-  * Flink RelNode which matches along with FlatMapOperator.
-  *
-  */
-class DataStreamCalc(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowRelDataType: RelDataType,
-    calcProgram: RexProgram,
-    ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
-  with FlinkCalc
-  with DataStreamRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamCalc(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      getRowType,
-      calcProgram,
-      ruleDescription
-    )
-  }
-
-  override def toString: String = calcToString(calcProgram, getExpressionString)
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .item("select", selectionToString(calcProgram, getExpressionString))
-      .itemIf("where",
-        conditionToString(calcProgram, getExpressionString),
-        calcProgram.getCondition != null)
-  }
-
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
-    val config = tableEnv.getConfig
-
-    val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val generator = new CodeGenerator(config, false, inputDataStream.getType)
-
-    val body = functionBody(
-      generator,
-      inputDataStream.getType,
-      getRowType,
-      calcProgram,
-      config,
-      expectedType)
-
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
-      body,
-      returnType)
-
-    val mapFunc = calcMapFunction(genFunction)
-    inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
deleted file mode 100644
index 3b6a653..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-
-class DataStreamConvention extends Convention {
-
-  override def toString: String = getName
-
-  override def useAbstractConvertersForConversion(
-    fromTraits: RelTraitSet,
-    toTraits: RelTraitSet): Boolean = false
-
-  override def canConvertConvention(toConvention: Convention): Boolean = false
-
-  def getInterface: Class[_] = classOf[DataStreamRel]
-
-  def getName: String = "DATASTREAM"
-
-  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
-
-  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
-
-  def register(planner: RelOptPlanner): Unit = { }
-}
-
-object DataStreamConvention {
-
-  val INSTANCE = new DataStreamConvention
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
deleted file mode 100644
index 3bfa6e2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalTableFunctionScan
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.functions.utils.TableSqlFunction
-import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
-  * Flink RelNode which matches along with join a user defined table function.
-  */
-class DataStreamCorrelate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputNode: RelNode,
-    scan: LogicalTableFunctionScan,
-    condition: Option[RexNode],
-    relRowType: RelDataType,
-    joinRowType: RelDataType,
-    joinType: SemiJoinType,
-    ruleDescription: String)
-  extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkCorrelate
-  with DataStreamRel {
-
-  override def deriveRowType() = relRowType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamCorrelate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      scan,
-      condition,
-      relRowType,
-      joinRowType,
-      joinType,
-      ruleDescription)
-  }
-
-  override def toString: String = {
-    val rexCall = scan.getCall.asInstanceOf[RexCall]
-    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    correlateToString(rexCall, sqlFunction)
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val rexCall = scan.getCall.asInstanceOf[RexCall]
-    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    super.explainTerms(pw)
-      .item("invocation", scan.getCall)
-      .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
-      .item("rowType", relRowType)
-      .item("joinType", joinType)
-      .itemIf("condition", condition.orNull, condition.isDefined)
-  }
-
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataStream[Any] = {
-
-    val config = tableEnv.getConfig
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    // we do not need to specify input type
-    val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
-    val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
-    val rexCall = funcRel.getCall.asInstanceOf[RexCall]
-    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
-    val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
-
-    val generator = new CodeGenerator(
-      config,
-      false,
-      inputDS.getType,
-      Some(udtfTypeInfo),
-      None,
-      Some(pojoFieldMapping))
-
-    val body = functionBody(
-      generator,
-      udtfTypeInfo,
-      getRowType,
-      rexCall,
-      condition,
-      config,
-      joinType,
-      expectedType)
-
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
-      body,
-      returnType)
-
-    val mapFunc = correlateMapFunction(genFunction)
-
-    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
deleted file mode 100644
index 6cf13a5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamRel.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.{StreamTableEnvironment, TableConfig}
-import org.apache.flink.api.table.plan.nodes.FlinkRel
-import org.apache.flink.streaming.api.datastream.DataStream
-
-trait DataStreamRel extends RelNode with FlinkRel {
-
-  /**
-    * Translates the FlinkRelNode into a Flink operator.
-    *
-    * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
-    * @param expectedType specifies the type the Flink operator should return. The type must
-    *                     have the same arity as the result. For instance, if the
-    *                     expected type is a RowTypeInfo this method will return a DataSet of
-    *                     type Row. If the expected type is Tuple2, the operator will return
-    *                     a Tuple2 if possible. Row otherwise.
-    * @return DataStream of type expectedType or RowTypeInfo
-    */
-  def translateToPlan(
-    tableEnv: StreamTableEnvironment,
-    expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
deleted file mode 100644
index da83b64..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.plan.schema.DataStreamTable
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
-  * Flink RelNode which matches along with DataStreamSource.
-  * It ensures that types without deterministic field order (e.g. POJOs) are not part of
-  * the plan translation.
-  */
-class DataStreamScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable,
-    rowRelDataType: RelDataType)
-  extends StreamScan(cluster, traitSet, table) {
-
-  val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamScan(
-      cluster,
-      traitSet,
-      getTable,
-      getRowType
-    )
-  }
-
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
-    val config = tableEnv.getConfig
-    val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
-
-    convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
deleted file mode 100644
index f490d31..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelWriter, BiRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.streaming.api.datastream.DataStream
-
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode which matches along with Union.
-  *
-  */
-class DataStreamUnion(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
-    rowRelDataType: RelDataType)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
-  with DataStreamRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamUnion(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      getRowType
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("union", unionSelectionToString)
-  }
-
-  override def toString = {
-    s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
-  }
-
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
-    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-    leftDataSet.union(rightDataSet)
-  }
-
-  private def unionSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
deleted file mode 100644
index 3b98653..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Values
-import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.streaming.api.datastream.DataStream
-
-import scala.collection.JavaConverters._
-
-/**
-  * DataStream RelNode for LogicalValues.
-  */
-class DataStreamValues(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    rowRelDataType: RelDataType,
-    tuples: ImmutableList[ImmutableList[RexLiteral]],
-    ruleDescription: String)
-  extends Values(cluster, rowRelDataType, tuples, traitSet)
-  with DataStreamRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamValues(
-      cluster,
-      traitSet,
-      getRowType,
-      getTuples,
-      ruleDescription
-    )
-  }
-
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataStream[Any] = {
-
-    val config = tableEnv.getConfig
-
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val generator = new CodeGenerator(config)
-
-    // generate code for every record
-    val generatedRecords = getTuples.asScala.map { r =>
-      generator.generateResultExpression(
-        returnType,
-        getRowType.getFieldNames.asScala,
-        r.asScala)
-    }
-
-    // generate input format
-    val generatedFunction = generator.generateValuesInputFormat(
-      ruleDescription,
-      generatedRecords.map(_.code),
-      returnType)
-
-    val inputFormat = new ValuesInputFormat[Any](
-      generatedFunction.name,
-      generatedFunction.code,
-      generatedFunction.returnType)
-
-    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
deleted file mode 100644
index b13770e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.schema.FlinkTable
-import org.apache.flink.api.table.runtime.MapRunner
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.streaming.api.datastream.DataStream
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-abstract class StreamScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
-  with DataStreamRel {
-
-  protected def convertToExpectedType(
-      input: DataStream[Any],
-      flinkTable: FlinkTable[_],
-      expectedType: Option[TypeInformation[Any]],
-      config: TableConfig): DataStream[Any] = {
-
-    val inputType = input.getType
-
-    expectedType match {
-
-      // special case:
-      // if efficient type usage is enabled and no expected type is set
-      // we can simply forward the DataSet to the next operator.
-      // however, we cannot forward PojoTypes as their fields don't have an order
-      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
-        input
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != inputType) {
-          val generator = new CodeGenerator(
-            config,
-            nullableInput = false,
-            input.getType,
-            flinkTable.fieldIndexes)
-
-          val conversion = generator.generateConverterResultExpression(
-            determinedType,
-            getRowType.getFieldNames)
-
-          val body =
-            s"""
-               |${conversion.code}
-               |return ${conversion.resultTerm};
-               |""".stripMargin
-
-          val genFunction = generator.generateFunction(
-            "DataSetSourceConversion",
-            classOf[MapFunction[Any, Any]],
-            body,
-            determinedType)
-
-          val mapFunc = new MapRunner[Any, Any](
-            genFunction.name,
-            genFunction.code,
-            genFunction.returnType)
-
-          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          input.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          input
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
deleted file mode 100644
index 8201070..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.datastream
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.StreamTableSource
-import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
-class StreamTableSourceScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable,
-    tableSource: StreamTableSource[_])
-  extends StreamScan(cluster, traitSet, table) {
-
-  override def deriveRowType() = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
-  }
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new StreamTableSourceScan(
-      cluster,
-      traitSet,
-      getTable,
-      tableSource
-    )
-  }
-
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
-
-    val config = tableEnv.getConfig
-    val inputDataStream: DataStream[Any] = tableSource
-      .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-
-    convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
deleted file mode 100644
index ee515c9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules
-
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan
-import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand}
-import org.apache.calcite.rel.logical.LogicalTableScan
-
-/**
- * Rule that converts an EnumerableTableScan into a LogicalTableScan.
- * We need this rule because Calcite creates an EnumerableTableScan
- * when parsing a SQL query. We convert it into a LogicalTableScan
- * so we can merge the optimization process with any plan that might be created
- * by the Table API.
- */
-class EnumerableToLogicalTableScan(
-    operand: RelOptRuleOperand,
-    description: String) extends RelOptRule(operand, description) {
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
-    val table = oldRel.getTable
-    val newRel = LogicalTableScan.create(oldRel.getCluster, table)
-    call.transformTo(newRel)
-  }
-}
-
-object EnumerableToLogicalTableScan {
-  val INSTANCE = new EnumerableToLogicalTableScan(
-      operand(classOf[EnumerableTableScan], any),
-    "EnumerableToLogicalTableScan")
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
deleted file mode 100644
index 183065c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules
-
-import org.apache.calcite.rel.rules._
-import org.apache.calcite.tools.{RuleSets, RuleSet}
-import org.apache.flink.api.table.plan.rules.dataSet._
-import org.apache.flink.api.table.plan.rules.datastream._
-import org.apache.flink.api.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
-
-object FlinkRuleSets {
-
-  /**
-    * RuleSet to optimize plans for batch / DataSet execution
-    */
-  val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
-
-    // convert a logical table scan to a relational expression
-    TableScanRule.INSTANCE,
-    EnumerableToLogicalTableScan.INSTANCE,
-
-    // push a filter into a join
-    FilterJoinRule.FILTER_ON_JOIN,
-    // push filter into the children of a join
-    FilterJoinRule.JOIN,
-    // push filter through an aggregation
-    FilterAggregateTransposeRule.INSTANCE,
-
-    // aggregation and projection rules
-    AggregateProjectMergeRule.INSTANCE,
-    AggregateProjectPullUpConstantsRule.INSTANCE,
-    // push a projection past a filter or vice versa
-    ProjectFilterTransposeRule.INSTANCE,
-    FilterProjectTransposeRule.INSTANCE,
-    // push a projection to the children of a join
-    ProjectJoinTransposeRule.INSTANCE,
-    // remove identity project
-    ProjectRemoveRule.INSTANCE,
-    // reorder sort and projection
-    SortProjectTransposeRule.INSTANCE,
-    ProjectSortTransposeRule.INSTANCE,
-
-    // join rules
-    JoinPushExpressionsRule.INSTANCE,
-
-    // remove union with only a single child
-    UnionEliminatorRule.INSTANCE,
-    // convert non-all union into all-union + distinct
-    UnionToDistinctRule.INSTANCE,
-
-    // remove aggregation if it does not aggregate and input is already distinct
-    AggregateRemoveRule.INSTANCE,
-    // push aggregate through join
-    AggregateJoinTransposeRule.EXTENDED,
-    // aggregate union rule
-    AggregateUnionAggregateRule.INSTANCE,
-
-    // remove unnecessary sort rule
-    SortRemoveRule.INSTANCE,
-
-    // simplify expressions rules
-    ReduceExpressionsRule.FILTER_INSTANCE,
-    ReduceExpressionsRule.PROJECT_INSTANCE,
-    ReduceExpressionsRule.CALC_INSTANCE,
-    ReduceExpressionsRule.JOIN_INSTANCE,
-
-    // prune empty results rules
-    PruneEmptyRules.AGGREGATE_INSTANCE,
-    PruneEmptyRules.FILTER_INSTANCE,
-    PruneEmptyRules.JOIN_LEFT_INSTANCE,
-    PruneEmptyRules.JOIN_RIGHT_INSTANCE,
-    PruneEmptyRules.PROJECT_INSTANCE,
-    PruneEmptyRules.SORT_INSTANCE,
-    PruneEmptyRules.UNION_INSTANCE,
-
-    // calc rules
-    FilterCalcMergeRule.INSTANCE,
-    ProjectCalcMergeRule.INSTANCE,
-    FilterToCalcRule.INSTANCE,
-    ProjectToCalcRule.INSTANCE,
-    CalcMergeRule.INSTANCE,
-
-    // translate to Flink DataSet nodes
-    DataSetAggregateRule.INSTANCE,
-    DataSetAggregateWithNullValuesRule.INSTANCE,
-    DataSetCalcRule.INSTANCE,
-    DataSetJoinRule.INSTANCE,
-    DataSetSingleRowJoinRule.INSTANCE,
-    DataSetScanRule.INSTANCE,
-    DataSetUnionRule.INSTANCE,
-    DataSetIntersectRule.INSTANCE,
-    DataSetMinusRule.INSTANCE,
-    DataSetSortRule.INSTANCE,
-    DataSetValuesRule.INSTANCE,
-    DataSetCorrelateRule.INSTANCE,
-    BatchTableSourceScanRule.INSTANCE,
-    // project pushdown optimization
-    PushProjectIntoBatchTableSourceScanRule.INSTANCE
-  )
-
-  /**
-  * RuleSet to optimize plans for stream / DataStream execution
-  */
-  val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
-
-      // convert a logical table scan to a relational expression
-      TableScanRule.INSTANCE,
-      EnumerableToLogicalTableScan.INSTANCE,
-
-      // calc rules
-      FilterToCalcRule.INSTANCE,
-      ProjectToCalcRule.INSTANCE,
-      FilterCalcMergeRule.INSTANCE,
-      ProjectCalcMergeRule.INSTANCE,
-      CalcMergeRule.INSTANCE,
-
-      // prune empty results rules
-      PruneEmptyRules.FILTER_INSTANCE,
-      PruneEmptyRules.PROJECT_INSTANCE,
-      PruneEmptyRules.UNION_INSTANCE,
-
-      // push and merge projection rules
-      ProjectFilterTransposeRule.INSTANCE,
-      FilterProjectTransposeRule.INSTANCE,
-      ProjectRemoveRule.INSTANCE,
-
-      // simplify expressions rules
-      ReduceExpressionsRule.FILTER_INSTANCE,
-      ReduceExpressionsRule.PROJECT_INSTANCE,
-      ReduceExpressionsRule.CALC_INSTANCE,
-
-      // merge and push unions rules
-      UnionEliminatorRule.INSTANCE,
-
-      // translate to DataStream nodes
-      DataStreamAggregateRule.INSTANCE,
-      DataStreamCalcRule.INSTANCE,
-      DataStreamScanRule.INSTANCE,
-      DataStreamUnionRule.INSTANCE,
-      DataStreamValuesRule.INSTANCE,
-      DataStreamCorrelateRule.INSTANCE,
-      StreamTableSourceScanRule.INSTANCE
-  )
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
deleted file mode 100644
index 8e3d8bb..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetConvention}
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.BatchTableSource
-
-/** Rule to convert a [[LogicalTableScan]] into a [[BatchTableSourceScan]]. */
-class BatchTableSourceScanRule
-  extends ConverterRule(
-      classOf[LogicalTableScan],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "BatchTableSourceScanRule")
-  {
-
-  /** Rule must only match if TableScan targets a [[BatchTableSource]] */
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
-    dataSetTable match {
-      case tst: TableSourceTable =>
-        tst.tableSource match {
-          case _: BatchTableSource[_] =>
-            true
-          case _ =>
-            false
-        }
-      case _ =>
-        false
-    }
-  }
-
-  def convert(rel: RelNode): RelNode = {
-    val scan: TableScan = rel.asInstanceOf[TableScan]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
-    val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource
-      .asInstanceOf[BatchTableSource[_]]
-    new BatchTableSourceScan(
-      rel.getCluster,
-      traitSet,
-      scan.getTable,
-      tableSource
-    )
-  }
-}
-
-object BatchTableSourceScanRule {
-  val INSTANCE: RelOptRule = new BatchTableSourceScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
deleted file mode 100644
index 0311c48..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.flink.api.table.TableException
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
-import scala.collection.JavaConversions._
-
-class DataSetAggregateRule
-  extends ConverterRule(
-      classOf[LogicalAggregate],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "DataSetAggregateRule")
-  {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
-
-    //for non grouped agg sets should attach null row to source data
-    //need apply DataSetAggregateWithNullValuesRule
-    if (agg.getGroupSet.isEmpty) {
-      return false
-    }
-
-    // check if we have distinct aggregates
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
-    if (distinctAggs) {
-      throw TableException("DISTINCT aggregates are currently not supported.")
-    }
-
-    // check if we have grouping sets
-    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
-    if (groupSets || agg.indicator) {
-      throw TableException("GROUPING SETS are currently not supported.")
-    }
-
-    !distinctAggs && !groupSets && !agg.indicator
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-    val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
-
-    new DataSetAggregate(
-      rel.getCluster,
-      traitSet,
-      convInput,
-      agg.getNamedAggCalls,
-      rel.getRowType,
-      agg.getInput.getRowType,
-      agg.getGroupSet.toArray)
-    }
-  }
-
-object DataSetAggregateRule {
-  val INSTANCE: RelOptRule = new DataSetAggregateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
deleted file mode 100644
index 3bf3e0c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan._
-import scala.collection.JavaConversions._
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate}
-import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.table._
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
-
-/**
-  * Rule for insert [[Row]] with null records into a [[DataSetAggregate]]
-  * Rule apply for non grouped aggregate query
-  */
-class DataSetAggregateWithNullValuesRule
-  extends ConverterRule(
-    classOf[LogicalAggregate],
-    Convention.NONE,
-    DataSetConvention.INSTANCE,
-    "DataSetAggregateWithNullValuesRule")
-{
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
-
-    //for grouped agg sets shouldn't attach of null row
-    //need apply other rules. e.g. [[DataSetAggregateRule]]
-    if (!agg.getGroupSet.isEmpty) {
-      return false
-    }
-
-    // check if we have distinct aggregates
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
-    if (distinctAggs) {
-      throw TableException("DISTINCT aggregates are currently not supported.")
-    }
-
-    // check if we have grouping sets
-    val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet
-    if (groupSets || agg.indicator) {
-      throw TableException("GROUPING SETS are currently not supported.")
-    }
-    !distinctAggs && !groupSets && !agg.indicator
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-    val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val cluster: RelOptCluster = rel.getCluster
-
-    val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
-    val nullLiterals: ImmutableList[ImmutableList[RexLiteral]] =
-      ImmutableList.of(ImmutableList.copyOf[RexLiteral](
-        for (fieldType <- fieldTypes)
-          yield {
-            cluster.getRexBuilder.
-              makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
-          }))
-
-    val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals)
-    val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true)
-
-    new DataSetAggregate(
-      cluster,
-      traitSet,
-      RelOptRule.convert(logicalUnion, DataSetConvention.INSTANCE),
-      agg.getNamedAggCalls,
-      rel.getRowType,
-      agg.getInput.getRowType,
-      agg.getGroupSet.toArray
-    )
-  }
-}
-
-object DataSetAggregateWithNullValuesRule {
-  val INSTANCE: RelOptRule = new DataSetAggregateWithNullValuesRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
deleted file mode 100644
index 88e74a9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCalcRule.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalCalc
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention}
-
-class DataSetCalcRule
-  extends ConverterRule(
-      classOf[LogicalCalc],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "DataSetCalcRule")
-  {
-
-    def convert(rel: RelNode): RelNode = {
-      val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE)
-
-      new DataSetCalc(
-        rel.getCluster,
-        traitSet,
-        convInput,
-        rel.getRowType,
-        calc.getProgram,
-        description)
-    }
-  }
-
-object DataSetCalcRule {
-  val INSTANCE: RelOptRule = new DataSetCalcRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
deleted file mode 100644
index 39756be..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate}
-
-/**
-  * Rule to convert a LogicalCorrelate into a DataSetCorrelate.
-  */
-class DataSetCorrelateRule
-  extends ConverterRule(
-      classOf[LogicalCorrelate],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "DataSetCorrelateRule") {
-
-    override def matches(call: RelOptRuleCall): Boolean = {
-      val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
-      val right = join.getRight.asInstanceOf[RelSubset].getOriginal
-
-
-      right match {
-        // right node is a table function
-        case scan: LogicalTableFunctionScan => true
-        // a filter is pushed above the table function
-        case filter: LogicalFilter =>
-          filter
-            .getInput.asInstanceOf[RelSubset]
-            .getOriginal
-            .isInstanceOf[LogicalTableFunctionScan]
-        case _ => false
-      }
-    }
-
-    override def convert(rel: RelNode): RelNode = {
-      val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate]
-      val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-      val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
-      val right: RelNode = join.getInput(1)
-
-      def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataSetCorrelate = {
-        relNode match {
-          case rel: RelSubset =>
-            convertToCorrelate(rel.getRelList.get(0), condition)
-
-          case filter: LogicalFilter =>
-            convertToCorrelate(
-              filter.getInput.asInstanceOf[RelSubset].getOriginal,
-              Some(filter.getCondition))
-
-          case scan: LogicalTableFunctionScan =>
-            new DataSetCorrelate(
-              rel.getCluster,
-              traitSet,
-              convInput,
-              scan,
-              condition,
-              rel.getRowType,
-              join.getRowType,
-              join.getJoinType,
-              description)
-        }
-      }
-      convertToCorrelate(right, None)
-    }
-  }
-
-object DataSetCorrelateRule {
-  val INSTANCE: RelOptRule = new DataSetCorrelateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
deleted file mode 100644
index c0e3269..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalIntersect
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetIntersect, DataSetConvention}
-
-class DataSetIntersectRule
-  extends ConverterRule(
-    classOf[LogicalIntersect],
-    Convention.NONE,
-    DataSetConvention.INSTANCE,
-    "DataSetIntersectRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-
-    val intersect: LogicalIntersect = rel.asInstanceOf[LogicalIntersect]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(intersect.getInput(0), DataSetConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(intersect.getInput(1), DataSetConvention.INSTANCE)
-
-    new DataSetIntersect(
-      rel.getCluster,
-      traitSet,
-      convLeft,
-      convRight,
-      rel.getRowType,
-      intersect.all)
-  }
-}
-
-object DataSetIntersectRule {
-  val INSTANCE: RelOptRule = new DataSetIntersectRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
deleted file mode 100644
index 3fab8bf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetJoinRule.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalJoin
-
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention}
-
-import scala.collection.JavaConversions._
-
-class DataSetJoinRule
-  extends ConverterRule(
-      classOf[LogicalJoin],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "DataSetJoinRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join: LogicalJoin = call.rel(0).asInstanceOf[LogicalJoin]
-
-    val joinInfo = join.analyzeCondition
-
-    // joins require an equi-condition or a conjunctive predicate with at least one equi-condition
-    !joinInfo.pairs().isEmpty
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-
-    val join: LogicalJoin = rel.asInstanceOf[LogicalJoin]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
-    val joinInfo = join.analyzeCondition
-
-    new DataSetJoin(
-      rel.getCluster,
-      traitSet,
-      convLeft,
-      convRight,
-      rel.getRowType,
-      join.getCondition,
-      join.getRowType,
-      joinInfo,
-      joinInfo.pairs.toList,
-      join.getJoinType,
-      null,
-      description)
-  }
-
-}
-
-object DataSetJoinRule {
-  val INSTANCE: RelOptRule = new DataSetJoinRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
deleted file mode 100644
index 44bead0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.logical.LogicalMinus
-import org.apache.calcite.rel.rules.UnionToDistinctRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus}
-
-class DataSetMinusRule
-  extends ConverterRule(
-    classOf[LogicalMinus],
-    Convention.NONE,
-    DataSetConvention.INSTANCE,
-    "DataSetMinusRule")
-{
-
-  def convert(rel: RelNode): RelNode = {
-
-    val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), DataSetConvention.INSTANCE)
-    val convRight: RelNode = RelOptRule.convert(minus.getInput(1), DataSetConvention.INSTANCE)
-
-    new DataSetMinus(
-      rel.getCluster,
-      traitSet,
-      convLeft,
-      convRight,
-      rel.getRowType,
-      minus.all)
-  }
-}
-
-object DataSetMinusRule {
-  val INSTANCE: RelOptRule = new DataSetMinusRule
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
deleted file mode 100644
index 7477690..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.logical.LogicalTableScan
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetScan}
-import org.apache.flink.api.table.plan.schema.DataSetTable
-
-class DataSetScanRule
-  extends ConverterRule(
-      classOf[LogicalTableScan],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "DataSetScanRule")
-  {
-
-  /**
-   * If the input is not a DataSetTable, we want the TableScanRule to match instead
-   */
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]])
-    dataSetTable match {
-      case _: DataSetTable[Any] =>
-        true
-      case _ =>
-        false
-    }
-  }
-
-  def convert(rel: RelNode): RelNode = {
-    val scan: TableScan = rel.asInstanceOf[TableScan]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-
-    new DataSetScan(
-      rel.getCluster,
-      traitSet,
-      scan.getTable,
-      rel.getRowType
-    )
-  }
-}
-
-object DataSetScanRule {
-  val INSTANCE: RelOptRule = new DataSetScanRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
deleted file mode 100644
index 8109fcf..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowJoinRule.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.dataSet
-
-import org.apache.calcite.plan.volcano.RelSubset
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowJoin}
-
-class DataSetSingleRowJoinRule
-  extends ConverterRule(
-      classOf[LogicalJoin],
-      Convention.NONE,
-      DataSetConvention.INSTANCE,
-      "DataSetSingleRowCrossRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join = call.rel(0).asInstanceOf[LogicalJoin]
-
-    if (isInnerJoin(join)) {
-      isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
-        isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
-    } else {
-      false
-    }
-  }
-
-  private def isInnerJoin(join: LogicalJoin) = {
-    join.getJoinType == JoinRelType.INNER
-  }
-
-  private def isGlobalAggregation(node: RelNode) = {
-    node.isInstanceOf[LogicalAggregate] &&
-      isSingleRow(node.asInstanceOf[LogicalAggregate])
-  }
-
-  private def isSingleRow(agg: LogicalAggregate) = {
-    agg.getGroupSet.isEmpty
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-    val join = rel.asInstanceOf[LogicalJoin]
-    val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
-    val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE)
-    val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE)
-    val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
-
-    new DataSetSingleRowJoin(
-      rel.getCluster,
-      traitSet,
-      dataSetLeftNode,
-      dataSetRightNode,
-      leftIsSingle,
-      rel.getRowType,
-      join.getCondition,
-      join.getRowType,
-      description)
-  }
-}
-
-object DataSetSingleRowJoinRule {
-  val INSTANCE: RelOptRule = new DataSetSingleRowJoinRule
-}