You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:03 UTC

[34/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
deleted file mode 100644
index 7932e11..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes
-
-import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.MapRunner
-
-import scala.collection.JavaConversions._
-
-trait FlinkRel {
-
-  private[flink] def getExpressionString(
-    expr: RexNode,
-    inFields: List[String],
-    localExprsTable: Option[List[RexNode]]): String = {
-
-    expr match {
-      case i: RexInputRef =>
-        inFields.get(i.getIndex)
-
-      case l: RexLiteral =>
-        l.toString
-
-      case l: RexLocalRef if localExprsTable.isEmpty =>
-        throw new IllegalArgumentException("Encountered RexLocalRef without " +
-          "local expression table")
-
-      case l: RexLocalRef =>
-        val lExpr = localExprsTable.get(l.getIndex)
-        getExpressionString(lExpr, inFields, localExprsTable)
-
-      case c: RexCall =>
-        val op = c.getOperator.toString
-        val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
-        s"$op(${ops.mkString(", ")})"
-
-      case fa: RexFieldAccess =>
-        val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
-        val field = fa.getField.getName
-        s"$referenceExpr.$field"
-
-      case _ =>
-        throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
-    }
-  }
-
-  private[flink] def getConversionMapper(
-      config: TableConfig,
-      nullableInput: Boolean,
-      inputType: TypeInformation[Any],
-      expectedType: TypeInformation[Any],
-      conversionOperatorName: String,
-      fieldNames: Seq[String],
-      inputPojoFieldMapping: Option[Array[Int]] = None)
-    : MapFunction[Any, Any] = {
-
-    val generator = new CodeGenerator(
-      config,
-      nullableInput,
-      inputType,
-      None,
-      inputPojoFieldMapping)
-    val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
-
-    val body =
-      s"""
-         |${conversion.code}
-         |return ${conversion.resultTerm};
-         |""".stripMargin
-
-    val genFunction = generator.generateFunction(
-      conversionOperatorName,
-      classOf[MapFunction[Any, Any]],
-      body,
-      expectedType)
-
-    new MapRunner[Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
deleted file mode 100644
index a6de237..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.plan.schema.FlinkTable
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-abstract class BatchScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
-  with DataSetRel {
-
-  override def toString: String = {
-    s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
-    val rowCnt = metadata.getRowCount(this)
-    planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
-  }
-
-  protected def convertToExpectedType(
-      input: DataSet[Any],
-      flinkTable: FlinkTable[_],
-      expectedType: Option[TypeInformation[Any]],
-      config: TableConfig): DataSet[Any] = {
-
-    val inputType = input.getType
-
-    expectedType match {
-
-      // special case:
-      // if efficient type usage is enabled and no expected type is set
-      // we can simply forward the DataSet to the next operator.
-      // however, we cannot forward PojoTypes as their fields don't have an order
-      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
-        input
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != inputType) {
-
-          val mapFunc = getConversionMapper(
-            config,
-            nullableInput = false,
-            inputType,
-            determinedType,
-            "DataSetSourceConversion",
-            getRowType.getFieldNames,
-            Some(flinkTable.fieldIndexes))
-
-          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          input.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          input
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
deleted file mode 100644
index e368219..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.BatchTableSource
-
-/** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
-class BatchTableSourceScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable,
-    val tableSource: BatchTableSource[_])
-  extends BatchScan(cluster, traitSet, table) {
-
-  override def deriveRowType() = {
-    val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-    val rowCnt = metadata.getRowCount(this)
-    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
-  }
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new BatchTableSourceScan(
-      cluster,
-      traitSet,
-      getTable,
-      tableSource
-    )
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .item("fields", tableSource.getFieldsNames.mkString(", "))
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val config = tableEnv.getConfig
-    val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
-
-    convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
deleted file mode 100644
index 94513d9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.nodes.FlinkAggregate
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
-import org.apache.flink.types.Row
-
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode which matches along with a LogicalAggregate.
-  */
-class DataSetAggregate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputNode: RelNode,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    rowRelDataType: RelDataType,
-    inputType: RelDataType,
-    grouping: Array[Int])
-  extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
-  with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetAggregate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      namedAggregates,
-      getRowType,
-      inputType,
-      grouping)
-  }
-
-  override def toString: String = {
-    s"Aggregate(${ if (!grouping.isEmpty) {
-      s"groupBy: (${groupingToString(inputType, grouping)}), "
-    } else {
-      ""
-    }}select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)}))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
-      .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil))
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
-    val child = this.getInput
-    val rowCnt = metadata.getRowCount(child)
-    val rowSize = this.estimateRowSize(child.getRowType)
-    val aggCnt = this.namedAggregates.size
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
-  }
-
-  override def translateToPlan(
-    tableEnv: BatchTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val config = tableEnv.getConfig
-
-    val groupingKeys = grouping.indices.toArray
-
-    val mapFunction = AggregateUtil.createPrepareMapFunction(
-      namedAggregates,
-      grouping,
-      inputType)
-
-    val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
-      namedAggregates,
-      inputType,
-      rowRelDataType,
-      grouping)
-
-    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
-    .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-    .toArray
-
-    val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
-    val prepareOpName = s"prepare select: ($aggString)"
-    val mappedInput = inputDS
-      .map(mapFunction)
-      .name(prepareOpName)
-
-    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
-    val result = {
-      if (groupingKeys.length > 0) {
-        // grouped aggregation
-        val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
-          s"select: ($aggString)"
-
-        mappedInput.asInstanceOf[DataSet[Row]]
-          .groupBy(groupingKeys: _*)
-          .reduceGroup(groupReduceFunction)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataSet[Any]]
-      }
-      else {
-        // global aggregation
-        val aggOpName = s"select:($aggString)"
-        mappedInput.asInstanceOf[DataSet[Row]]
-          .reduceGroup(groupReduceFunction)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataSet[Any]]
-      }
-    }
-
-    // if the expected type is not a Row, inject a mapper to convert to the expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-        result.map(getConversionMapper(
-          config = config,
-          nullableInput = false,
-          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
-          expectedType = expectedType.get,
-          conversionOperatorName = "DataSetAggregateConversion",
-          fieldNames = getRowType.getFieldNames.asScala
-        ))
-        .name(mapName)
-      case _ => result
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
deleted file mode 100644
index c0881b7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptPlanner, RelOptCost, RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.FlinkCalc
-import org.apache.flink.api.table.typeutils.TypeConverter
-import TypeConverter._
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode which matches along with LogicalCalc.
-  *
-  */
-class DataSetCalc(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    rowRelDataType: RelDataType,
-    private[flink] val calcProgram: RexProgram, // for tests
-    ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
-  with FlinkCalc
-  with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetCalc(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      getRowType,
-      calcProgram,
-      ruleDescription)
-  }
-
-  override def toString: String = calcToString(calcProgram, getExpressionString)
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .item("select", selectionToString(calcProgram, getExpressionString))
-      .itemIf("where",
-        conditionToString(calcProgram, getExpressionString),
-        calcProgram.getCondition != null)
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
-    val child = this.getInput
-    val rowCnt = metadata.getRowCount(child)
-
-    // compute number of expressions that do not access a field or literal, i.e. computations,
-    //   conditions, etc. We only want to account for computations, not for simple projections.
-    val compCnt = calcProgram.getExprList.asScala.toList.count {
-      case i: RexInputRef => false
-      case l: RexLiteral => false
-      case _ => true
-    }
-
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
-  }
-
-  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
-    val child = this.getInput
-    val rowCnt = metadata.getRowCount(child)
-
-    if (calcProgram.getCondition != null) {
-      // we reduce the result card to push filters down
-      (rowCnt * 0.75).min(1.0)
-    } else {
-      rowCnt
-    }
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val config = tableEnv.getConfig
-
-    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val generator = new CodeGenerator(config, false, inputDS.getType)
-
-    val body = functionBody(
-      generator,
-      inputDS.getType,
-      getRowType,
-      calcProgram,
-      config,
-      expectedType)
-
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
-      body,
-      returnType)
-
-    val mapFunc = calcMapFunction(genFunction)
-    inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
deleted file mode 100644
index 03d9a51..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-
-class DataSetConvention extends Convention {
-
-  override def toString: String = getName
-
-  override def useAbstractConvertersForConversion(
-    fromTraits: RelTraitSet,
-    toTraits: RelTraitSet): Boolean = false
-
-  override def canConvertConvention(toConvention: Convention): Boolean = false
-
-  def getInterface: Class[_] = classOf[DataSetRel]
-
-  def getName: String = "DATASET"
-
-  def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
-
-  def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
-
-  def register(planner: RelOptPlanner): Unit = { }
-}
-
-object DataSetConvention {
-
-  val INSTANCE = new DataSetConvention
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
deleted file mode 100644
index 95eb15b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalTableFunctionScan
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexNode, RexCall}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.functions.utils.TableSqlFunction
-import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-/**
-  * Flink RelNode which matches along with join a user defined table function.
-  */
-class DataSetCorrelate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputNode: RelNode,
-    scan: LogicalTableFunctionScan,
-    condition: Option[RexNode],
-    relRowType: RelDataType,
-    joinRowType: RelDataType,
-    joinType: SemiJoinType,
-    ruleDescription: String)
-  extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkCorrelate
-  with DataSetRel {
-
-  override def deriveRowType() = relRowType
-
-  override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-    val rowCnt = metadata.getRowCount(getInput) * 1.5
-    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 0.5)
-  }
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetCorrelate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      scan,
-      condition,
-      relRowType,
-      joinRowType,
-      joinType,
-      ruleDescription)
-  }
-
-  override def toString: String = {
-    val rexCall = scan.getCall.asInstanceOf[RexCall]
-    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    correlateToString(rexCall, sqlFunction)
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val rexCall = scan.getCall.asInstanceOf[RexCall]
-    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    super.explainTerms(pw)
-      .item("invocation", scan.getCall)
-      .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
-      .item("rowType", relRowType)
-      .item("joinType", joinType)
-      .itemIf("condition", condition.orNull, condition.isDefined)
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataSet[Any] = {
-
-    val config = tableEnv.getConfig
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    // we do not need to specify input type
-    val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
-    val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
-    val rexCall = funcRel.getCall.asInstanceOf[RexCall]
-    val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
-    val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
-
-    val generator = new CodeGenerator(
-      config,
-      false,
-      inputDS.getType,
-      Some(udtfTypeInfo),
-      None,
-      Some(pojoFieldMapping))
-
-    val body = functionBody(
-      generator,
-      udtfTypeInfo,
-      getRowType,
-      rexCall,
-      condition,
-      config,
-      joinType,
-      expectedType)
-
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
-      body,
-      returnType)
-
-    val mapFunc = correlateMapFunction(genFunction)
-
-    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
deleted file mode 100644
index d2203d0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.runtime.IntersectCoGroupFunction
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode which translates Intersect into CoGroup Operator.
-  *
-  */
-class DataSetIntersect(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
-    rowRelDataType: RelDataType,
-    all: Boolean)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
-    with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetIntersect(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      getRowType,
-      all
-    )
-  }
-
-  override def toString: String = {
-    s"Intersect(intersect: ($intersectSelectionToString))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("intersect", intersectSelectionToString)
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-    val children = this.getInputs
-    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
-      val rowCnt = metadata.getRowCount(child)
-      val rowSize = this.estimateRowSize(child.getRowType)
-      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
-    }
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
-    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
-
-    val coGroupOpName = s"intersect: ($intersectSelectionToString)"
-    val coGroupFunction = new IntersectCoGroupFunction[Any](all)
-
-    val intersectDs = coGroupedDs.where("*").equalTo("*")
-      .`with`(coGroupFunction).name(coGroupOpName)
-
-    val config = tableEnv.getConfig
-    val leftType = leftDataSet.getType
-
-    // here we only care about left type information, because we emit records from left DataSet
-    expectedType match {
-      case None if config.getEfficientTypeUsage =>
-        intersectDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != leftType) {
-          val mapFunc = getConversionMapper(
-            config,
-            false,
-            leftType,
-            determinedType,
-            "DataSetIntersectConversion",
-            getRowType.getFieldNames)
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          intersectDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          intersectDs
-        }
-    }
-  }
-
-  private def intersectSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
deleted file mode 100644
index ccd84ca..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.calcite.util.mapping.IntPair
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.FlatJoinRunner
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.RexNode
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-/**
-  * Flink RelNode which matches along with JoinOperator and its related operations.
-  */
-class DataSetJoin(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
-    rowRelDataType: RelDataType,
-    joinCondition: RexNode,
-    joinRowType: RelDataType,
-    joinInfo: JoinInfo,
-    keyPairs: List[IntPair],
-    joinType: JoinRelType,
-    joinHint: JoinHint,
-    ruleDescription: String)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
-  with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetJoin(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      getRowType,
-      joinCondition,
-      joinRowType,
-      joinInfo,
-      keyPairs,
-      joinType,
-      joinHint,
-      ruleDescription)
-  }
-
-  override def toString: String = {
-    s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .item("where", joinConditionToString)
-      .item("join", joinSelectionToString)
-      .item("joinType", joinTypeToString)
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
-    val leftRowCnt = metadata.getRowCount(getLeft)
-    val leftRowSize = estimateRowSize(getLeft.getRowType)
-
-    val rightRowCnt = metadata.getRowCount(getRight)
-    val rightRowSize = estimateRowSize(getRight.getRowType)
-
-    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
-    val cpuCost = leftRowCnt + rightRowCnt
-    val rowCnt = leftRowCnt + rightRowCnt
-
-    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val config = tableEnv.getConfig
-
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    // get the equality keys
-    val leftKeys = ArrayBuffer.empty[Int]
-    val rightKeys = ArrayBuffer.empty[Int]
-    if (keyPairs.isEmpty) {
-      // if no equality keys => not supported
-      throw TableException(
-        "Joins should have at least one equality condition.\n" +
-          s"\tLeft: ${left.toString},\n" +
-          s"\tRight: ${right.toString},\n" +
-          s"\tCondition: ($joinConditionToString)"
-      )
-    }
-    else {
-      // at least one equality expression
-      val leftFields = left.getRowType.getFieldList
-      val rightFields = right.getRowType.getFieldList
-
-      keyPairs.foreach(pair => {
-        val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
-        val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
-
-        // check if keys are compatible
-        if (leftKeyType == rightKeyType) {
-          // add key pair
-          leftKeys.add(pair.source)
-          rightKeys.add(pair.target)
-        } else {
-          throw TableException(
-            "Equality join predicate on incompatible types.\n" +
-              s"\tLeft: ${left.toString},\n" +
-              s"\tRight: ${right.toString},\n" +
-              s"\tCondition: ($joinConditionToString)"
-          )
-        }
-      })
-    }
-
-    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
-    val (joinOperator, nullCheck) = joinType match {
-      case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
-      case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
-      case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
-      case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
-    }
-
-    if (nullCheck && !config.getNullCheck) {
-      throw TableException("Null check in TableConfig must be enabled for outer joins.")
-    }
-
-    val generator = new CodeGenerator(
-      config,
-      nullCheck,
-      leftDataSet.getType,
-      Some(rightDataSet.getType))
-    val conversion = generator.generateConverterResultExpression(
-      returnType,
-      joinRowType.getFieldNames)
-
-    var body = ""
-
-    if (joinInfo.isEqui) {
-      // only equality condition
-      body = s"""
-           |${conversion.code}
-           |${generator.collectorTerm}.collect(${conversion.resultTerm});
-           |""".stripMargin
-    }
-    else {
-      val condition = generator.generateExpression(joinCondition)
-      body = s"""
-           |${condition.code}
-           |if (${condition.resultTerm}) {
-           |  ${conversion.code}
-           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-           |}
-           |""".stripMargin
-    }
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatJoinFunction[Any, Any, Any]],
-      body,
-      returnType)
-
-    val joinFun = new FlatJoinRunner[Any, Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-    val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
-
-    joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
-      .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
-  }
-
-  private def joinSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-  private def joinConditionToString: String = {
-
-    val inFields = joinRowType.getFieldNames.asScala.toList
-    getExpressionString(joinCondition, inFields, None)
-  }
-
-  private def joinTypeToString = joinType match {
-    case JoinRelType.INNER => "InnerJoin"
-    case JoinRelType.LEFT=> "LeftOuterJoin"
-    case JoinRelType.RIGHT => "RightOuterJoin"
-    case JoinRelType.FULL => "FullOuterJoin"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
deleted file mode 100644
index 6a5cbd1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.runtime.MinusCoGroupFunction
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode which implements set minus operation.
-  *
-  */
-class DataSetMinus(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
-    rowRelDataType: RelDataType,
-    all: Boolean)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
-    with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetMinus(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      getRowType,
-      all
-    )
-  }
-
-  override def toString: String = {
-    s"Minus(minus: ($minusSelectionToString}))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("minus", minusSelectionToString)
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-    val children = this.getInputs
-    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
-      val rowCnt = metadata.getRowCount(child)
-      val rowSize = this.estimateRowSize(child.getRowType)
-      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
-    }
-  }
-
-  override def estimateRowCount(mq: RelMetadataQuery): Double = {
-    // from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount
-    val children = this.getInputs
-    var rowCnt = mq.getRowCount(children.head)
-    getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_))
-    if (rowCnt < 0) {
-      rowCnt = 0.0
-    }
-    rowCnt
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
-    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
-
-    val coGroupOpName = s"minus: ($minusSelectionToString)"
-    val coGroupFunction = new MinusCoGroupFunction[Any](all)
-
-    val minusDs = coGroupedDs.where("*").equalTo("*")
-      .`with`(coGroupFunction).name(coGroupOpName)
-
-    val config = tableEnv.getConfig
-    val leftType = leftDataSet.getType
-
-    // here we only care about left type information, because we emit records from left DataSet
-    expectedType match {
-      case None if config.getEfficientTypeUsage =>
-        minusDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != leftType) {
-          val mapFunc = getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = leftType,
-            expectedType = determinedType,
-            conversionOperatorName = "DataSetMinusConversion",
-            fieldNames = getRowType.getFieldNames)
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          minusDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          minusDs
-        }
-    }
-  }
-
-  private def minusSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
deleted file mode 100644
index 82c75e1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.FlinkRel
-import org.apache.flink.api.table.runtime.MapRunner
-import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException}
-
-import scala.collection.JavaConversions._
-
-trait DataSetRel extends RelNode with FlinkRel {
-
-  /**
-    * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
-    *
-    * @param tableEnv [[org.apache.flink.api.table.BatchTableEnvironment]] of the translated Table.
-    * @param expectedType specifies the type the Flink operator should return. The type must
-    *                     have the same arity as the result. For instance, if the
-    *                     expected type is a RowTypeInfo this method will return a DataSet of
-    *                     type Row. If the expected type is Tuple2, the operator will return
-    *                     a Tuple2 if possible. Row otherwise.
-    * @return DataSet of type expectedType or RowTypeInfo
-    */
-  def translateToPlan(
-     tableEnv: BatchTableEnvironment,
-     expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
-
-  private[flink] def estimateRowSize(rowType: RelDataType): Double = {
-
-    rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
-      t match {
-        case SqlTypeName.TINYINT => s + 1
-        case SqlTypeName.SMALLINT => s + 2
-        case SqlTypeName.INTEGER => s + 4
-        case SqlTypeName.BIGINT => s + 8
-        case SqlTypeName.BOOLEAN => s + 1
-        case SqlTypeName.FLOAT => s + 4
-        case SqlTypeName.DOUBLE => s + 8
-        case SqlTypeName.VARCHAR => s + 12
-        case SqlTypeName.CHAR => s + 1
-        case SqlTypeName.DECIMAL => s + 12
-        case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
-        case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
-        case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
-        case _ => throw TableException(s"Unsupported data type encountered: $t")
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
deleted file mode 100644
index b783136..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.plan.schema.DataSetTable
-
-/**
-  * Flink RelNode which matches along with DataSource.
-  * It ensures that types without deterministic field order (e.g. POJOs) are not part of
-  * the plan translation.
-  */
-class DataSetScan(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    table: RelOptTable,
-    rowRelDataType: RelDataType)
-  extends BatchScan(cluster, traitSet, table) {
-
-  val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetScan(
-      cluster,
-      traitSet,
-      getTable,
-      getRowType
-    )
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val config = tableEnv.getConfig
-    val inputDataSet: DataSet[Any] = dataSetTable.dataSet
-
-    convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
deleted file mode 100644
index 0306c00..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
-  * Flink RelNode that executes a Join where one of inputs is a single row.
-  */
-class DataSetSingleRowJoin(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
-    leftIsSingle: Boolean,
-    rowRelDataType: RelDataType,
-    joinCondition: RexNode,
-    joinRowType: RelDataType,
-    ruleDescription: String)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
-  with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetSingleRowJoin(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      leftIsSingle,
-      getRowType,
-      joinCondition,
-      joinRowType,
-      ruleDescription)
-  }
-
-  override def toString: String = {
-    s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .item("where", joinConditionToString)
-      .item("join", joinSelectionToString)
-      .item("joinType", joinTypeToString)
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-    val child = if (leftIsSingle) {
-      this.getRight
-    } else {
-      this.getLeft
-    }
-    val rowCnt = metadata.getRowCount(child)
-    val rowSize = this.estimateRowSize(child.getRowType)
-    planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val broadcastSetName = "joinSet"
-    val mapSideJoin = generateMapFunction(
-      tableEnv.getConfig,
-      leftDataSet.getType,
-      rightDataSet.getType,
-      leftIsSingle,
-      joinCondition,
-      broadcastSetName,
-      expectedType)
-
-    val (multiRowDataSet, singleRowDataSet) =
-      if (leftIsSingle) {
-        (rightDataSet, leftDataSet)
-      } else {
-        (leftDataSet, rightDataSet)
-      }
-
-    multiRowDataSet
-      .flatMap(mapSideJoin)
-      .withBroadcastSet(singleRowDataSet, broadcastSetName)
-      .name(getMapOperatorName)
-      .asInstanceOf[DataSet[Any]]
-  }
-
-  private def generateMapFunction(
-      config: TableConfig,
-      inputType1: TypeInformation[Any],
-      inputType2: TypeInformation[Any],
-      firstIsSingle: Boolean,
-      joinCondition: RexNode,
-      broadcastInputSetName: String,
-      expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
-
-    val codeGenerator = new CodeGenerator(
-      config,
-      false,
-      inputType1,
-      Some(inputType2))
-
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val conversion = codeGenerator.generateConverterResultExpression(
-      returnType,
-      joinRowType.getFieldNames)
-
-    val condition = codeGenerator.generateExpression(joinCondition)
-
-    val joinMethodBody = s"""
-                  |${condition.code}
-                  |if (${condition.resultTerm}) {
-                  |  ${conversion.code}
-                  |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
-                  |}
-                  |""".stripMargin
-
-    val genFunction = codeGenerator.generateFunction(
-      ruleDescription,
-      classOf[FlatJoinFunction[Any, Any, Any]],
-      joinMethodBody,
-      returnType)
-
-    if (firstIsSingle) {
-      new MapJoinRightRunner[Any, Any, Any](
-        genFunction.name,
-        genFunction.code,
-        genFunction.returnType,
-        broadcastInputSetName)
-    } else {
-      new MapJoinLeftRunner[Any, Any, Any](
-        genFunction.name,
-        genFunction.code,
-        genFunction.returnType,
-        broadcastInputSetName)
-    }
-  }
-
-  private def getMapOperatorName: String = {
-    s"where: ($joinConditionToString), join: ($joinSelectionToString)"
-  }
-
-  private def joinSelectionToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-  private def joinConditionToString: String = {
-    val inFields = joinRowType.getFieldNames.asScala.toList
-    getExpressionString(joinCondition, inFields, None)
-  }
-
-  private def joinTypeToString: String = {
-    "NestedLoopJoin"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
deleted file mode 100644
index 661aeef..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import java.util
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelFieldCollation.Direction
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexLiteral, RexNode}
-import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
-import org.apache.flink.api.table.runtime.{CountPartitionFunction, LimitFilterFunction}
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConverters._
-
-class DataSetSort(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inp: RelNode,
-    collations: RelCollation,
-    rowRelDataType: RelDataType,
-    offset: RexNode,
-    fetch: RexNode)
-  extends SingleRel(cluster, traitSet, inp)
-  with DataSetRel {
-
-  private val limitStart: Long = if (offset != null) {
-    RexLiteral.intValue(offset)
-  } else {
-    0L
-  }
-
-  private val limitEnd: Long = if (fetch != null) {
-    RexLiteral.intValue(fetch) + limitStart
-  } else {
-    Long.MaxValue
-  }
-
-  override def deriveRowType(): RelDataType = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
-    new DataSetSort(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      collations,
-      getRowType,
-      offset,
-      fetch
-    )
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]] = None)
-    : DataSet[Any] = {
-
-    if (fieldCollations.isEmpty) {
-      throw TableException("Limiting the result without sorting is not allowed " +
-        "as it could lead to arbitrary results.")
-    }
-
-    val config = tableEnv.getConfig
-
-    val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
-    val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
-    var partitionedDs = if (currentParallelism == 1) {
-      inputDs
-    } else {
-      inputDs.partitionByRange(fieldCollations.map(_._1): _*)
-        .withOrders(fieldCollations.map(_._2): _*)
-    }
-
-    fieldCollations.foreach { fieldCollation =>
-      partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
-    }
-
-    val limitedDs = if (offset == null && fetch == null) {
-      partitionedDs
-    } else {
-      val countFunction = new CountPartitionFunction[Any]
-
-      val partitionCountName = s"prepare offset/fetch"
-
-      val partitionCount = partitionedDs
-        .mapPartition(countFunction)
-        .name(partitionCountName)
-
-      val broadcastName = "countPartition"
-
-      val limitFunction = new LimitFilterFunction[Any](
-        limitStart,
-        limitEnd,
-        broadcastName)
-
-      val limitName = s"offset: $offsetToString, fetch: $fetchToString"
-
-      partitionedDs
-        .filter(limitFunction)
-        .name(limitName)
-        .withBroadcastSet(partitionCount, broadcastName)
-    }
-
-    val inputType = partitionedDs.getType
-    expectedType match {
-
-      case None if config.getEfficientTypeUsage =>
-        limitedDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != inputType) {
-
-          val mapFunc = getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = partitionedDs.getType,
-            expectedType = determinedType,
-            conversionOperatorName = "DataSetSortConversion",
-            fieldNames = getRowType.getFieldNames.asScala
-          )
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          limitedDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          limitedDs
-        }
-    }
-  }
-
-  private def directionToOrder(direction: Direction) = {
-    direction match {
-      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
-      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
-      case _ => throw new IllegalArgumentException("Unsupported direction.")
-    }
-
-  }
-
-  private val fieldCollations = collations.getFieldCollations.asScala
-    .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
-
-  private val sortFieldsToString = fieldCollations
-    .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
-
-  private val offsetToString = s"$offset"
-
-  private val fetchToString = if (limitEnd == Long.MaxValue) {
-    "unlimited"
-  } else {
-    s"$limitEnd"
-  }
-
-  override def toString: String =
-    s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
-
-  override def explainTerms(pw: RelWriter) : RelWriter = {
-    super.explainTerms(pw)
-      .item("orderBy", sortFieldsToString)
-      .item("offset", offsetToString)
-      .item("fetch", fetchToString)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
deleted file mode 100644
index 6e43fae..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
-* Flink RelNode which matches along with UnionOperator.
-*
-*/
-class DataSetUnion(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
-    rowRelDataType: RelDataType)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
-  with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetUnion(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      rowRelDataType
-    )
-  }
-
-  override def toString: String = {
-    s"Union(union: ($unionSelectionToString))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("union", unionSelectionToString)
-  }
-
-  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
-    val children = this.getInputs
-    val rowCnt = children.foldLeft(0D) { (rows, child) =>
-      rows + metadata.getRowCount(child)
-    }
-
-    planner.getCostFactory.makeCost(rowCnt, 0, 0)
-  }
-
-  override def estimateRowCount(mq: RelMetadataQuery): Double = {
-    // adopted from org.apache.calcite.rel.metadata.RelMdUtil.getUnionAllRowCount
-    getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    var leftDataSet: DataSet[Any] = null
-    var rightDataSet: DataSet[Any] = null
-
-    expectedType match {
-      case None =>
-        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-        rightDataSet =
-          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
-      case _ =>
-        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-    }
-
-    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
-  }
-
-  private def unionSelectionToString: String = {
-    rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
deleted file mode 100644
index 4f3a257..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Values
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConverters._
-
-/**
-  * DataSet RelNode for a LogicalValues.
-  *
-  */
-class DataSetValues(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    rowRelDataType: RelDataType,
-    tuples: ImmutableList[ImmutableList[RexLiteral]],
-    ruleDescription: String)
-  extends Values(cluster, rowRelDataType, tuples, traitSet)
-  with DataSetRel {
-
-  override def deriveRowType() = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataSetValues(
-      cluster,
-      traitSet,
-      getRowType,
-      getTuples,
-      ruleDescription
-    )
-  }
-
-  override def toString: String = {
-    s"Values(values: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw).item("values", valuesFieldsToString)
-  }
-
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    val config = tableEnv.getConfig
-
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val generator = new CodeGenerator(config)
-
-    // generate code for every record
-    val generatedRecords = getTuples.asScala.map { r =>
-      generator.generateResultExpression(
-        returnType,
-        getRowType.getFieldNames.asScala,
-        r.asScala)
-    }
-
-    // generate input format
-    val generatedFunction = generator.generateValuesInputFormat(
-      ruleDescription,
-      generatedRecords.map(_.code),
-      returnType)
-
-    val inputFormat = new ValuesInputFormat[Any](
-      generatedFunction.name,
-      generatedFunction.code,
-      generatedFunction.returnType)
-
-    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
-  }
-
-  private def valuesFieldsToString: String = {
-    getRowType.getFieldNames.asScala.toList.mkString(", ")
-  }
-
-}
-
-