You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/02/13 16:51:23 UTC

[2/3] flink git commit: [FLINK-5662] [table] Rework internal type handling of Table API

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 03178ad..245a038 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -22,15 +22,15 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter
-import TypeConverter._
-import org.apache.calcite.rex._
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -46,7 +46,7 @@ class DataSetCalc(
     private[flink] val calcProgram: RexProgram, // for tests
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with FlinkCalc
+  with CommonCalc
   with DataSetRel {
 
   override def deriveRowType() = rowRelDataType
@@ -99,19 +99,13 @@ class DataSetCalc(
     }
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val generator = new CodeGenerator(config, false, inputDS.getType)
 
@@ -120,12 +114,11 @@ class DataSetCalc(
       inputDS.getType,
       getRowType,
       calcProgram,
-      config,
-      expectedType)
+      config)
 
     val genFunction = generator.generateFunction(
       ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
+      classOf[FlatMapFunction[Row, Row]],
       body,
       returnType)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 5a75e5d..c18a829 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with join a user defined table function.
@@ -45,7 +45,7 @@ class DataSetCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkCorrelate
+  with CommonCorrelate
   with DataSetRel {
 
   override def deriveRowType() = relRowType
@@ -85,10 +85,7 @@ class DataSetCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
@@ -109,7 +106,6 @@ class DataSetCorrelate(
       joinType,
       rexCall,
       condition,
-      expectedType,
       Some(pojoFieldMapping),
       ruleDescription)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
index 332aa8a..4497df3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.runtime.IntersectCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -75,55 +74,21 @@ class DataSetIntersect(
     }
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
-    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
     val coGroupOpName = s"intersect: ($intersectSelectionToString)"
-    val coGroupFunction = new IntersectCoGroupFunction[Any](all)
-
-    val intersectDs = coGroupedDs.where("*").equalTo("*")
-      .`with`(coGroupFunction).name(coGroupOpName)
-
-    val config = tableEnv.getConfig
-    val leftType = leftDataSet.getType
-
-    // here we only care about left type information, because we emit records from left DataSet
-    expectedType match {
-      case None if config.getEfficientTypeUsage =>
-        intersectDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != leftType) {
-          val mapFunc = getConversionMapper(
-            config,
-            false,
-            leftType,
-            determinedType,
-            "DataSetIntersectConversion",
-            getRowType.getFieldNames)
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          intersectDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          intersectDs
-        }
-    }
+    val coGroupFunction = new IntersectCoGroupFunction[Row](all)
+
+    coGroupedDs
+      .where("*")
+      .equalTo("*")
+      .`with`(coGroupFunction)
+      .name(coGroupOpName)
   }
 
   private def intersectSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index edb5be2..e6f8ca4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -23,17 +23,16 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.FlatJoinRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -102,17 +101,11 @@ class DataSetJoin(
     planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     // get the equality keys
     val leftKeys = ArrayBuffer.empty[Int]
@@ -195,19 +188,22 @@ class DataSetJoin(
     }
     val genFunction = generator.generateFunction(
       ruleDescription,
-      classOf[FlatJoinFunction[Any, Any, Any]],
+      classOf[FlatJoinFunction[Row, Row, Row]],
       body,
       returnType)
 
-    val joinFun = new FlatJoinRunner[Any, Any, Any](
+    val joinFun = new FlatJoinRunner[Row, Row, Row](
       genFunction.name,
       genFunction.code,
       genFunction.returnType)
 
     val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
 
-    joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
-      .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
+    joinOperator
+      .where(leftKeys.toArray: _*)
+      .equalTo(rightKeys.toArray: _*)
+      .`with`(joinFun)
+      .name(joinOpName)
   }
 
   private def joinSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
index 672ff9c..9ba65bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.runtime.MinusCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -86,55 +85,21 @@ class DataSetMinus(
     rowCnt
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
-    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
     val coGroupOpName = s"minus: ($minusSelectionToString)"
-    val coGroupFunction = new MinusCoGroupFunction[Any](all)
-
-    val minusDs = coGroupedDs.where("*").equalTo("*")
-      .`with`(coGroupFunction).name(coGroupOpName)
-
-    val config = tableEnv.getConfig
-    val leftType = leftDataSet.getType
-
-    // here we only care about left type information, because we emit records from left DataSet
-    expectedType match {
-      case None if config.getEfficientTypeUsage =>
-        minusDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != leftType) {
-          val mapFunc = getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = leftType,
-            expectedType = determinedType,
-            conversionOperatorName = "DataSetMinusConversion",
-            fieldNames = getRowType.getFieldNames)
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          minusDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          minusDs
-        }
-    }
+    val coGroupFunction = new MinusCoGroupFunction[Row](all)
+
+    coGroupedDs
+      .where("*")
+      .equalTo("*")
+      .`with`(coGroupFunction)
+      .name(coGroupOpName)
   }
 
   private def minusSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
index 02138cf..980f3cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,26 +19,19 @@
 package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.plan.nodes.FlinkRel
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.types.Row
 
 trait DataSetRel extends RelNode with FlinkRel {
 
   /**
     * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
     *
-    * @param tableEnv     [[BatchTableEnvironment]] of the translated Table.
-    * @param expectedType specifies the type the Flink operator should return. The type must
-    *                     have the same arity as the result. For instance, if the
-    *                     expected type is a RowTypeInfo this method will return a DataSet of
-    *                     type Row. If the expected type is Tuple2, the operator will return
-    *                     a Tuple2 if possible. Row otherwise.
-    * @return DataSet of type expectedType or RowTypeInfo
+    * @param tableEnv The [[BatchTableEnvironment]] of the translated Table.
+    * @return DataSet of type [[Row]]
     */
-  def translateToPlan(
-     tableEnv: BatchTableEnvironment,
-     expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
+  def translateToPlan(tableEnv: BatchTableEnvironment) : DataSet[Row]
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
index 48bbb74..44d2d00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.plan.schema.DataSetTable
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with DataSource.
@@ -51,14 +51,12 @@ class DataSetScan(
     )
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
     val inputDataSet: DataSet[Any] = dataSetTable.dataSet
 
-    convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
+    convertToInternalRow(inputDataSet, dataSetTable, config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index a70b4ab..b7d1a4b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -26,10 +26,11 @@ import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -87,9 +88,7 @@ class DataSetSingleRowJoin(
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
@@ -100,8 +99,7 @@ class DataSetSingleRowJoin(
       rightDataSet.getType,
       leftIsSingle,
       joinCondition,
-      broadcastSetName,
-      expectedType)
+      broadcastSetName)
 
     val (multiRowDataSet, singleRowDataSet) =
       if (leftIsSingle) {
@@ -114,17 +112,16 @@ class DataSetSingleRowJoin(
       .flatMap(mapSideJoin)
       .withBroadcastSet(singleRowDataSet, broadcastSetName)
       .name(getMapOperatorName)
-      .asInstanceOf[DataSet[Any]]
   }
 
   private def generateMapFunction(
       config: TableConfig,
-      inputType1: TypeInformation[Any],
-      inputType2: TypeInformation[Any],
+      inputType1: TypeInformation[Row],
+      inputType2: TypeInformation[Row],
       firstIsSingle: Boolean,
       joinCondition: RexNode,
-      broadcastInputSetName: String,
-      expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
+      broadcastInputSetName: String)
+    : FlatMapFunction[Row, Row] = {
 
     val codeGenerator = new CodeGenerator(
       config,
@@ -132,11 +129,7 @@ class DataSetSingleRowJoin(
       inputType1,
       Some(inputType2))
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val conversion = codeGenerator.generateConverterResultExpression(
       returnType,
@@ -144,28 +137,29 @@ class DataSetSingleRowJoin(
 
     val condition = codeGenerator.generateExpression(joinCondition)
 
-    val joinMethodBody = s"""
-                  |${condition.code}
-                  |if (${condition.resultTerm}) {
-                  |  ${conversion.code}
-                  |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
-                  |}
-                  |""".stripMargin
+    val joinMethodBody =
+      s"""
+        |${condition.code}
+        |if (${condition.resultTerm}) {
+        |  ${conversion.code}
+        |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+        |}
+        |""".stripMargin
 
     val genFunction = codeGenerator.generateFunction(
       ruleDescription,
-      classOf[FlatJoinFunction[Any, Any, Any]],
+      classOf[FlatJoinFunction[Row, Row, Row]],
       joinMethodBody,
       returnType)
 
     if (firstIsSingle) {
-      new MapJoinRightRunner[Any, Any, Any](
+      new MapJoinRightRunner[Row, Row, Row](
         genFunction.name,
         genFunction.code,
         genFunction.returnType,
         broadcastInputSetName)
     } else {
-      new MapJoinLeftRunner[Any, Any, Any](
+      new MapJoinLeftRunner[Row, Row, Row](
         genFunction.name,
         genFunction.code,
         genFunction.returnType,

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index 4d84730..192237a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -27,11 +27,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -87,10 +86,7 @@ class DataSetSort(
     }
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]] = None)
-    : DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     if (fieldCollations.isEmpty) {
       throw TableException("Limiting the result without sorting is not allowed " +
@@ -113,10 +109,10 @@ class DataSetSort(
       partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
     }
 
-    val limitedDs = if (offset == null && fetch == null) {
+    if (offset == null && fetch == null) {
       partitionedDs
     } else {
-      val countFunction = new CountPartitionFunction[Any]
+      val countFunction = new CountPartitionFunction[Row]
 
       val partitionCountName = s"prepare offset/fetch"
 
@@ -126,7 +122,7 @@ class DataSetSort(
 
       val broadcastName = "countPartition"
 
-      val limitFunction = new LimitFilterFunction[Any](
+      val limitFunction = new LimitFilterFunction[Row](
         limitStart,
         limitEnd,
         broadcastName)
@@ -138,41 +134,6 @@ class DataSetSort(
         .name(limitName)
         .withBroadcastSet(partitionCount, broadcastName)
     }
-
-    val inputType = partitionedDs.getType
-    expectedType match {
-
-      case None if config.getEfficientTypeUsage =>
-        limitedDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != inputType) {
-
-          val mapFunc = getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = partitionedDs.getType,
-            expectedType = determinedType,
-            conversionOperatorName = "DataSetSortConversion",
-            fieldNames = getRowType.getFieldNames.asScala
-          )
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          limitedDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          limitedDs
-        }
-    }
   }
 
   private def directionToOrder(direction: Direction) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
index b0c95b5..a87c6e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -77,24 +77,12 @@ class DataSetUnion(
     getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    var leftDataSet: DataSet[Any] = null
-    var rightDataSet: DataSet[Any] = null
-
-    expectedType match {
-      case None =>
-        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-        rightDataSet =
-          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
-      case _ =>
-        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-    }
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+    leftDataSet.union(rightDataSet)
   }
 
   private def unionSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index e0282f2..3ebee2c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -66,17 +66,11 @@ class DataSetValues(
     super.explainTerms(pw).item("values", valuesFieldsToString)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val generator = new CodeGenerator(config)
 
@@ -94,12 +88,12 @@ class DataSetValues(
       generatedRecords.map(_.code),
       returnType)
 
-    val inputFormat = new ValuesInputFormat[Any](
+    val inputFormat = new ValuesInputFormat[Row](
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)
 
-    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+    tableEnv.execEnv.createInput(inputFormat, returnType)
   }
 
   private def valuesFieldsToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index b165afa..48de822 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -23,21 +23,17 @@ import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConversions._
-
 /**
   * Flink RelNode which matches along with a LogicalWindowAggregate.
   */
@@ -52,7 +48,7 @@ class DataSetWindowAggregate(
     inputType: RelDataType,
     grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
+  with CommonAggregate
   with DataSetRel {
 
   override def deriveRowType() = rowRelDataType
@@ -109,20 +105,15 @@ class DataSetWindowAggregate(
     planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
   }
 
-  override def translateToPlan(
-    tableEnv: BatchTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
-    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     // whether identifiers are matched case-sensitively
     val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
-    val result = window match {
+    window match {
       case EventTimeTumblingGroupWindow(_, _, size) =>
         createEventTimeTumblingWindowDataSet(
           inputDS,
@@ -139,31 +130,14 @@ class DataSetWindowAggregate(
             "windows in a batch environment must declare a time attribute over which " +
             "the query is evaluated.")
     }
-
-    // if the expected type is not a Row, inject a mapper to convert to the expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${getRowType.getFieldNames.toList.mkString(", ")})"
-        result.map(
-          getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = resultRowTypeInfo.asInstanceOf[TypeInformation[Any]],
-            expectedType = expectedType.get,
-            conversionOperatorName = "DataSetWindowAggregateConversion",
-            fieldNames = getRowType.getFieldNames
-          ))
-          .name(mapName)
-      case _ => result
-    }
   }
 
 
   private def createEventTimeTumblingWindowDataSet(
-      inputDS: DataSet[Any],
+      inputDS: DataSet[Row],
       isTimeWindow: Boolean,
       isParserCaseSensitive: Boolean)
-    : DataSet[Any] = {
+    : DataSet[Row] = {
     val mapFunction = createDataSetWindowPrepareMapFunction(
       window,
       namedAggregates,
@@ -182,6 +156,8 @@ class DataSetWindowAggregate(
       .map(mapFunction)
       .name(prepareOperatorName)
 
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
     val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
     if (isTimeWindow) {
       // grouped time window aggregation
@@ -190,9 +166,8 @@ class DataSetWindowAggregate(
       mappedInput.asInstanceOf[DataSet[Row]]
         .groupBy(groupingKeys: _*)
         .reduceGroup(groupReduceFunction)
-        .returns(resultRowTypeInfo)
+        .returns(rowTypeInfo)
         .name(aggregateOperatorName)
-        .asInstanceOf[DataSet[Any]]
     } else {
       // count window
       val groupingKeys = grouping.indices.toArray
@@ -203,10 +178,8 @@ class DataSetWindowAggregate(
           // sort on time field, it's the last element in the row
           .sortGroup(mapReturnType.getArity - 1, Order.ASCENDING)
           .reduceGroup(groupReduceFunction)
-          .returns(resultRowTypeInfo)
+          .returns(rowTypeInfo)
           .name(aggregateOperatorName)
-          .asInstanceOf[DataSet[Any]]
-
       } else {
         // TODO: count tumbling all window on event-time should sort all the data set
         // on event time before applying the windowing logic.
@@ -217,11 +190,12 @@ class DataSetWindowAggregate(
   }
 
   private[this] def createEventTimeSessionWindowDataSet(
-    inputDS: DataSet[Any],
-    isParserCaseSensitive: Boolean): DataSet[Any] = {
+      inputDS: DataSet[Row],
+      isParserCaseSensitive: Boolean)
+    : DataSet[Row] = {
 
     val groupingKeys = grouping.indices.toArray
-    val rowTypeInfo = resultRowTypeInfo
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     // grouping window
     if (groupingKeys.length > 0) {
@@ -280,7 +254,6 @@ class DataSetWindowAggregate(
           .reduceGroup(groupReduceFunction)
           .returns(rowTypeInfo)
           .name(aggregateOperatorName)
-          .asInstanceOf[DataSet[Any]]
       }
       // do non-incremental aggregation
       else {
@@ -298,7 +271,6 @@ class DataSetWindowAggregate(
         .reduceGroup(groupReduceFunction)
         .returns(rowTypeInfo)
         .name(aggregateOperatorName)
-        .asInstanceOf[DataSet[Any]]
       }
     }
     // non-grouping window
@@ -332,12 +304,4 @@ class DataSetWindowAggregate(
       s"window: ($window), select: ($aggString)"
     }
   }
-
-  private def resultRowTypeInfo: RowTypeInfo = {
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-      .toArray
-    new RowTypeInfo(fieldTypes: _*)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index 6a3d4e3..c21d008 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -22,27 +22,23 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
-
-import scala.collection.JavaConverters._
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.types.Row
 
 class DataStreamAggregate(
     window: LogicalWindow,
@@ -55,7 +51,7 @@ class DataStreamAggregate(
     inputType: RelDataType,
     grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
+  with CommonAggregate
   with DataStreamRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -103,24 +99,12 @@ class DataStreamAggregate(
         namedProperties))
   }
 
-  override def translateToPlan(
-    tableEnv: StreamTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
-    val config = tableEnv.getConfig
     val groupingKeys = grouping.indices.toArray
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] =
-      getRowType.getFieldList.asScala
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-      .toArray
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val aggString = aggregationToString(
       inputType,
@@ -142,121 +126,100 @@ class DataStreamAggregate(
 
     val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
 
-    val result: DataStream[Any] = {
-      // check whether all aggregates support partial aggregate
-      if (AggregateUtil.doAllSupportPartialAggregation(
-            namedAggregates.map(_.getKey),
-            inputType,
-            grouping.length)) {
-        // do Incremental Aggregation
-        val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+
+    // check whether all aggregates support partial aggregate
+    if (AggregateUtil.doAllSupportPartialAggregation(
+          namedAggregates.map(_.getKey),
+          inputType,
+          grouping.length)) {
+      // do Incremental Aggregation
+      val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+        namedAggregates,
+        inputType,
+        getRowType,
+        grouping)
+      // grouped / keyed aggregation
+      if (groupingKeys.length > 0) {
+        val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
+          window,
           namedAggregates,
           inputType,
-          getRowType,
-          grouping)
-        // grouped / keyed aggregation
-        if (groupingKeys.length > 0) {
-          val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-          val windowedStream =
-            createKeyedWindowedStream(window, keyedStream)
-            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-          windowedStream
-          .apply(reduceFunction, windowFunction)
-          .returns(rowTypeInfo)
-          .name(keyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-        // global / non-keyed aggregation
-        else {
-          val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val windowedStream =
-            createNonKeyedWindowedStream(window, mappedInput)
-            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-          windowedStream
-          .apply(reduceFunction, windowFunction)
-          .returns(rowTypeInfo)
-          .name(nonKeyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+        val windowedStream =
+          createKeyedWindowedStream(window, keyedStream)
+          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+        windowedStream
+        .reduce(reduceFunction, windowFunction)
+        .returns(rowTypeInfo)
+        .name(keyedAggOpName)
       }
+      // global / non-keyed aggregation
       else {
-        // do non-Incremental Aggregation
-        // grouped / keyed aggregation
-        if (groupingKeys.length > 0) {
-
-          val windowFunction = AggregateUtil.createWindowAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-          val windowedStream =
-            createKeyedWindowedStream(window, keyedStream)
-            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-          windowedStream
-          .apply(windowFunction)
-          .returns(rowTypeInfo)
-          .name(keyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-        // global / non-keyed aggregation
-        else {
-          val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val windowedStream =
-            createNonKeyedWindowedStream(window, mappedInput)
-            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-          windowedStream
-          .apply(windowFunction)
-          .returns(rowTypeInfo)
-          .name(nonKeyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
+        val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val windowedStream =
+          createNonKeyedWindowedStream(window, mappedInput)
+          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+        windowedStream
+        .reduce(reduceFunction, windowFunction)
+        .returns(rowTypeInfo)
+        .name(nonKeyedAggOpName)
       }
     }
+    else {
+      // do non-Incremental Aggregation
+      // grouped / keyed aggregation
+      if (groupingKeys.length > 0) {
+
+        val windowFunction = AggregateUtil.createWindowAggregationFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+        val windowedStream =
+          createKeyedWindowedStream(window, keyedStream)
+          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
 
-    // if the expected type is not a Row, inject a mapper to convert to the expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-        result.map(getConversionMapper(
-          config = config,
-          nullableInput = false,
-          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
-          expectedType = expectedType.get,
-          conversionOperatorName = "DataStreamAggregateConversion",
-          fieldNames = getRowType.getFieldNames.asScala
-        ))
-          .name(mapName)
-      case _ => result
+        windowedStream
+        .apply(windowFunction)
+        .returns(rowTypeInfo)
+        .name(keyedAggOpName)
+      }
+      // global / non-keyed aggregation
+      else {
+        val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val windowedStream =
+          createNonKeyedWindowedStream(window, mappedInput)
+          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+        windowedStream
+        .apply(windowFunction)
+        .returns(rowTypeInfo)
+        .name(nonKeyedAggOpName)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 43f1fb6..b39ae4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -22,13 +22,13 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter._
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with FlatMapOperator.
@@ -42,7 +42,7 @@ class DataStreamCalc(
     private[flink] val calcProgram: RexProgram,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with FlinkCalc
+  with CommonCalc
   with DataStreamRel {
 
   override def deriveRowType() = rowRelDataType
@@ -68,20 +68,12 @@ class DataStreamCalc(
         calcProgram.getCondition != null)
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
 
     val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
     val generator = new CodeGenerator(config, false, inputDataStream.getType)
 
     val body = functionBody(
@@ -89,14 +81,13 @@ class DataStreamCalc(
       inputDataStream.getType,
       getRowType,
       calcProgram,
-      config,
-      expectedType)
+      config)
 
     val genFunction = generator.generateFunction(
       ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
+      classOf[FlatMapFunction[Row, Row]],
       body,
-      returnType)
+      FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val mapFunc = calcMapFunction(genFunction)
     inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index bd65954..dd799e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -24,11 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with join a user defined table function.
@@ -44,7 +44,7 @@ class DataStreamCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkCorrelate
+  with CommonCorrelate
   with DataStreamRel {
 
   override def deriveRowType() = relRowType
@@ -79,10 +79,7 @@ class DataStreamCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
 
@@ -103,7 +100,6 @@ class DataStreamCorrelate(
       joinType,
       rexCall,
       condition,
-      expectedType,
       Some(pojoFieldMapping),
       ruleDescription)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 16427b8..6f20831 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.nodes.FlinkRel
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.FlinkRel
+import org.apache.flink.types.Row
 
 trait DataStreamRel extends RelNode with FlinkRel {
 
@@ -30,16 +30,9 @@ trait DataStreamRel extends RelNode with FlinkRel {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
-    * @param expectedType specifies the type the Flink operator should return. The type must
-    *                     have the same arity as the result. For instance, if the
-    *                     expected type is a RowTypeInfo this method will return a DataSet of
-    *                     type Row. If the expected type is Tuple2, the operator will return
-    *                     a Tuple2 if possible. Row otherwise.
-    * @return DataStream of type expectedType or RowTypeInfo
+    * @return DataStream of type [[Row]]
     */
-  def translateToPlan(
-    tableEnv: StreamTableEnvironment,
-    expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
+  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
 
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 2d5ec09..e8d218e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.schema.DataStreamTable
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with DataStreamSource.
@@ -51,14 +51,12 @@ class DataStreamScan(
     )
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
 
-    convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
+    convertToInternalRow(inputDataStream, dataStreamTable, config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index beb15d2..f676176 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -60,9 +60,7 @@ class DataStreamUnion(
     s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index f2a3d72..0ab4a48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -57,18 +57,11 @@ class DataStreamValues(
     )
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val generator = new CodeGenerator(config)
 
@@ -86,12 +79,12 @@ class DataStreamValues(
       generatedRecords.map(_.code),
       returnType)
 
-    val inputFormat = new ValuesInputFormat[Any](
+    val inputFormat = new ValuesInputFormat[Row](
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)
 
-    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+    tableEnv.execEnv.createInput(inputFormat, returnType)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index ddac958..56f7f27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -19,17 +19,13 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -39,69 +35,37 @@ abstract class StreamScan(
     traitSet: RelTraitSet,
     table: RelOptTable)
   extends TableScan(cluster, traitSet, table)
+  with CommonScan
   with DataStreamRel {
 
-  protected def convertToExpectedType(
+  protected def convertToInternalRow(
       input: DataStream[Any],
       flinkTable: FlinkTable[_],
-      expectedType: Option[TypeInformation[Any]],
-      config: TableConfig): DataStream[Any] = {
+      config: TableConfig)
+    : DataStream[Row] = {
 
     val inputType = input.getType
 
-    expectedType match {
-
-      // special case:
-      // if efficient type usage is enabled and no expected type is set
-      // we can simply forward the DataSet to the next operator.
-      // however, we cannot forward PojoTypes as their fields don't have an order
-      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
-        input
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
+    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-        // conversion
-        if (determinedType != inputType) {
-          val generator = new CodeGenerator(
-            config,
-            nullableInput = false,
-            input.getType,
-            flinkTable.fieldIndexes)
+    // conversion
+    if (needsConversion(inputType, internalType)) {
 
-          val conversion = generator.generateConverterResultExpression(
-            determinedType,
-            getRowType.getFieldNames)
+      val mapFunc = getConversionMapper(
+        config,
+        inputType,
+        internalType,
+        "DataStreamSourceConversion",
+        getRowType.getFieldNames,
+        Some(flinkTable.fieldIndexes))
 
-          val body =
-            s"""
-               |${conversion.code}
-               |return ${conversion.resultTerm};
-               |""".stripMargin
+      val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-          val genFunction = generator.generateFunction(
-            "DataSetSourceConversion",
-            classOf[MapFunction[Any, Any]],
-            body,
-            determinedType)
-
-          val mapFunc = new MapRunner[Any, Any](
-            genFunction.name,
-            genFunction.code,
-            genFunction.returnType)
-
-          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          input.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          input
-        }
+      input.map(mapFunc).name(opName)
+    }
+    // no conversion necessary, forward
+    else {
+      input.asInstanceOf[DataStream[Row]]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 7550593..73d0291 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
@@ -62,15 +62,13 @@ class StreamTableSourceScan(
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = tableSource
       .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
 
-    convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
+    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index e89f14f..034ff9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -67,9 +67,10 @@ object AggregateUtil {
     *
     */
   private[flink] def createPrepareMapFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    groupings: Array[Int],
-    inputType: RelDataType): MapFunction[Any, Row] = {
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      groupings: Array[Int],
+      inputType: RelDataType)
+    : MapFunction[Row, Row] = {
 
     val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -83,7 +84,7 @@ object AggregateUtil {
       aggregates,
       aggFieldIndexes,
       groupings,
-      mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+      mapReturnType)
 
     mapFunction
   }
@@ -113,11 +114,12 @@ object AggregateUtil {
     * NOTE: this function is only used for time based window on batch tables.
     */
   def createDataSetWindowPrepareMapFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    groupings: Array[Int],
-    inputType: RelDataType,
-    isParserCaseSensitive: Boolean): MapFunction[Any, Row] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      groupings: Array[Int],
+      inputType: RelDataType,
+      isParserCaseSensitive: Boolean)
+    : MapFunction[Row, Row] = {
 
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -147,7 +149,7 @@ object AggregateUtil {
       groupings,
       timeFieldPos,
       tumbleTimeWindowSize,
-      mapReturnType).asInstanceOf[MapFunction[Any, Row]]
+      mapReturnType)
   }
 
   /**
@@ -159,13 +161,14 @@ object AggregateUtil {
     * NOTE: this function is only used for window on batch tables.
     */
   def createDataSetWindowAggregationGroupReduceFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty],
-    isInputCombined: Boolean = false): RichGroupReduceFunction[Row, Row] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty],
+      isInputCombined: Boolean = false)
+    : RichGroupReduceFunction[Row, Row] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -269,10 +272,11 @@ object AggregateUtil {
     *
     */
   private[flink] def createDataSetWindowAggregationCombineFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    groupings: Array[Int]): RichGroupCombineFunction[Row,Row] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      groupings: Array[Int])
+    : RichGroupCombineFunction[Row,Row] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -313,11 +317,12 @@ object AggregateUtil {
     *
     */
   private[flink] def createAggregateGroupReduceFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    inGroupingSet: Boolean): RichGroupReduceFunction[Row, Row] = {
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      inGroupingSet: Boolean)
+    : RichGroupReduceFunction[Row, Row] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -370,10 +375,11 @@ object AggregateUtil {
     *
     */
   private[flink] def createIncrementalAggregateReduceFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int])
+    : IncrementalAggregateReduceFunction = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -397,13 +403,13 @@ object AggregateUtil {
     * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
     */
   private[flink] def createAllWindowAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty])
-  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : AllWindowFunction[Row, Row, DataStreamWindow] = {
 
     val aggFunction =
       createAggregateGroupReduceFunction(
@@ -427,13 +433,13 @@ object AggregateUtil {
     *
     */
   private[flink] def createWindowAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty])
-  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
 
     val aggFunction =
       createAggregateGroupReduceFunction(
@@ -457,12 +463,13 @@ object AggregateUtil {
     * window aggregates.
     */
   private[flink] def createAllWindowIncrementalAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : AllWindowFunction[Row, Row, DataStreamWindow] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -499,12 +506,13 @@ object AggregateUtil {
     * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
     */
   private[flink] def createWindowIncrementalAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),inputType,groupings.length)._2

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
deleted file mode 100644
index a2a120b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkTypeFactory
-
-import scala.collection.JavaConversions._
-
-object TypeConverter {
-
-  val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
-
-  /**
-    * Determines the return type of Flink operators based on the logical fields, the expected
-    * physical type and configuration parameters.
-    *
-    * For example:
-    *   - No physical type expected, only 3 non-null fields and efficient type usage enabled
-    *       -> return Tuple3
-    *   - No physical type expected, efficient type usage enabled, but 3 nullable fields
-    *       -> return Row because Tuple does not support null values
-    *   - Physical type expected
-    *       -> check if physical type is compatible and return it
-    *
-    * @param logicalRowType logical row information
-    * @param expectedPhysicalType expected physical type
-    * @param nullable fields can be nullable
-    * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
-    * @return suitable return type
-    */
-  def determineReturnType(
-      logicalRowType: RelDataType,
-      expectedPhysicalType: Option[TypeInformation[Any]],
-      nullable: Boolean,
-      useEfficientTypes: Boolean)
-    : TypeInformation[Any] = {
-    // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
-      FlinkTypeFactory.toTypeInfo(relDataType.getType)
-    }
-    // field names
-    val logicalFieldNames = logicalRowType.getFieldNames.toList
-
-    val returnType = expectedPhysicalType match {
-      // a certain physical type is expected (but not Row)
-      // check if expected physical type is compatible with logical field type
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        if (typeInfo.getArity != logicalFieldTypes.length) {
-          throw new TableException("Arity of result does not match expected type.")
-        }
-        typeInfo match {
-
-          // POJO type expected
-          case pt: PojoTypeInfo[_] =>
-            logicalFieldNames.zip(logicalFieldTypes) foreach {
-              case (fName, fType) =>
-                val pojoIdx = pt.getFieldIndex(fName)
-                if (pojoIdx < 0) {
-                  throw new TableException(s"POJO does not define field name: $fName")
-                }
-                val expectedTypeInfo = pt.getTypeAt(pojoIdx)
-                if (fType != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fType")
-                }
-            }
-
-          // Tuple/Case class type expected
-          case ct: CompositeType[_] =>
-            logicalFieldTypes.zipWithIndex foreach {
-              case (fieldTypeInfo, i) =>
-                val expectedTypeInfo = ct.getTypeAt(i)
-                if (fieldTypeInfo != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
-                }
-            }
-
-          // Atomic type expected
-          case at: AtomicType[_] =>
-            val fieldTypeInfo = logicalFieldTypes.head
-            if (fieldTypeInfo != at) {
-              throw new TableException(s"Result field does not match expected type. " +
-                s"Expected: $at; Actual: $fieldTypeInfo")
-            }
-
-          case _ =>
-            throw new TableException("Unsupported result type.")
-        }
-        typeInfo
-
-      // Row is expected, create the arity for it
-      case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
-        new RowTypeInfo(logicalFieldTypes: _*)
-
-      // no physical type
-      // determine type based on logical fields and configuration parameters
-      case None =>
-        // no need for efficient types -> use Row
-        // we cannot use efficient types if row arity > tuple arity or nullable
-        if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
-          new RowTypeInfo(logicalFieldTypes: _*)
-        }
-        // use efficient type tuple or atomic type
-        else {
-          if (logicalFieldTypes.length == 1) {
-            logicalFieldTypes.head
-          }
-          else {
-            new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
-          }
-        }
-    }
-    returnType.asInstanceOf[TypeInformation[Any]]
-  }
-
-  def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
-    case INNER => JoinType.INNER
-    case LEFT => JoinType.LEFT_OUTER
-    case RIGHT => JoinType.RIGHT_OUTER
-    case FULL => JoinType.FULL_OUTER
-  }
-
-  def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
-    case JoinType.INNER => JoinRelType.INNER
-    case JoinType.LEFT_OUTER => JoinRelType.LEFT
-    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
-    case JoinType.FULL_OUTER => JoinRelType.FULL
-  }
-}