You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/02/13 16:51:22 UTC

[1/3] flink git commit: [FLINK-5662] [table] Rework internal type handling of Table API

Repository: flink
Updated Branches:
  refs/heads/master 1ce10c877 -> 6bc6b225e


http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index e84c906..dece295 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -56,8 +56,7 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 	@Parameterized.Parameters(name = "Table config = {0}")
 	public static Collection<Object[]> parameters() {
 		return Arrays.asList(new Object[][] {
-			{ TableProgramsTestBase.DEFAULT() },
-			{ TableProgramsTestBase.EFFICIENT() }
+			{ TableProgramsTestBase.DEFAULT() }
 		});
 	}
 
@@ -265,8 +264,8 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 		data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
 
 		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data), "a, b, c, d")
-			.select("a, b, c, d");
+			.fromDataSet(env.fromCollection(data), "q, w, e, r")
+			.select("q as a, w as b, e as c, r as d");
 
 		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
 		List<SmallPojo2> results = ds.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 2b00cc9..6cbe834 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -261,8 +261,8 @@ object TableEnvironmentITCase {
   @Parameterized.Parameters(name = "Table config = {0}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
     Seq[Array[AnyRef]](
-      Array(TableProgramsTestBase.DEFAULT),
-      Array(TableProgramsTestBase.EFFICIENT)).asJava
+      Array(TableProgramsTestBase.DEFAULT)
+    ).asJava
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
index a699068..586d716 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala.batch.utils
 import java.util
 
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode}
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.junit.runners.Parameterized
@@ -38,8 +38,6 @@ class TableProgramsTestBase(
     tableConfigMode match {
       case NO_NULL =>
         conf.setNullCheck(false)
-      case EFFICIENT =>
-        conf.setEfficientTypeUsage(true)
       case _ => // keep default
     }
     conf
@@ -47,11 +45,10 @@ class TableProgramsTestBase(
 }
 
 object TableProgramsTestBase {
-  case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
+  case class TableConfigMode(nullCheck: Boolean)
 
-  val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
-  val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
-  val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
+  val DEFAULT = TableConfigMode(nullCheck = true)
+  val NO_NULL = TableConfigMode(nullCheck = false)
 
   @Parameterized.Parameters(name = "Table config = {0}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 8555632..b4327ec 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -95,7 +95,7 @@ abstract class ExpressionTestBase {
     val generator = new CodeGenerator(config, false, typeInfo)
 
     // cast expressions to String
-    val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR)).toSeq
+    val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR))
 
     // generate code
     val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO): _*)
@@ -110,16 +110,16 @@ abstract class ExpressionTestBase {
         |return ${genExpr.resultTerm};
         |""".stripMargin
 
-    val genFunc = generator.generateFunction[MapFunction[Any, String]](
+    val genFunc = generator.generateFunction[MapFunction[Any, Row], Row](
       "TestFunction",
-      classOf[MapFunction[Any, String]],
+      classOf[MapFunction[Any, Row]],
       bodyCode,
-      resultType.asInstanceOf[TypeInformation[Any]])
+      resultType)
 
     // compile and evaluate
-    val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
+    val clazz = new TestCompiler[MapFunction[Any, Row], Row]().compile(genFunc)
     val mapper = clazz.newInstance()
-    val result = mapper.map(testData).asInstanceOf[Row]
+    val result = mapper.map(testData)
 
     // compare
     testExprs
@@ -211,8 +211,8 @@ abstract class ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   // TestCompiler that uses current class loader
-  class TestCompiler[T <: Function] extends Compiler[T] {
-    def compile(genFunc: GeneratedFunction[T]): Class[T] =
+  class TestCompiler[F <: Function, T <: Any] extends Compiler[F] {
+    def compile(genFunc: GeneratedFunction[F, T]): Class[F] =
       compile(getClass.getClassLoader, genFunc.name, genFunc.code)
   }
 }


[2/3] flink git commit: [FLINK-5662] [table] Rework internal type handling of Table API

Posted by tw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 03178ad..245a038 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -22,15 +22,15 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter
-import TypeConverter._
-import org.apache.calcite.rex._
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -46,7 +46,7 @@ class DataSetCalc(
     private[flink] val calcProgram: RexProgram, // for tests
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with FlinkCalc
+  with CommonCalc
   with DataSetRel {
 
   override def deriveRowType() = rowRelDataType
@@ -99,19 +99,13 @@ class DataSetCalc(
     }
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val generator = new CodeGenerator(config, false, inputDS.getType)
 
@@ -120,12 +114,11 @@ class DataSetCalc(
       inputDS.getType,
       getRowType,
       calcProgram,
-      config,
-      expectedType)
+      config)
 
     val genFunction = generator.generateFunction(
       ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
+      classOf[FlatMapFunction[Row, Row]],
       body,
       returnType)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 5a75e5d..c18a829 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with join a user defined table function.
@@ -45,7 +45,7 @@ class DataSetCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkCorrelate
+  with CommonCorrelate
   with DataSetRel {
 
   override def deriveRowType() = relRowType
@@ -85,10 +85,7 @@ class DataSetCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
@@ -109,7 +106,6 @@ class DataSetCorrelate(
       joinType,
       rexCall,
       condition,
-      expectedType,
       Some(pojoFieldMapping),
       ruleDescription)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
index 332aa8a..4497df3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.runtime.IntersectCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -75,55 +74,21 @@ class DataSetIntersect(
     }
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
-    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
     val coGroupOpName = s"intersect: ($intersectSelectionToString)"
-    val coGroupFunction = new IntersectCoGroupFunction[Any](all)
-
-    val intersectDs = coGroupedDs.where("*").equalTo("*")
-      .`with`(coGroupFunction).name(coGroupOpName)
-
-    val config = tableEnv.getConfig
-    val leftType = leftDataSet.getType
-
-    // here we only care about left type information, because we emit records from left DataSet
-    expectedType match {
-      case None if config.getEfficientTypeUsage =>
-        intersectDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != leftType) {
-          val mapFunc = getConversionMapper(
-            config,
-            false,
-            leftType,
-            determinedType,
-            "DataSetIntersectConversion",
-            getRowType.getFieldNames)
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          intersectDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          intersectDs
-        }
-    }
+    val coGroupFunction = new IntersectCoGroupFunction[Row](all)
+
+    coGroupedDs
+      .where("*")
+      .equalTo("*")
+      .`with`(coGroupFunction)
+      .name(coGroupOpName)
   }
 
   private def intersectSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index edb5be2..e6f8ca4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -23,17 +23,16 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.FlatJoinRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -102,17 +101,11 @@ class DataSetJoin(
     planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     // get the equality keys
     val leftKeys = ArrayBuffer.empty[Int]
@@ -195,19 +188,22 @@ class DataSetJoin(
     }
     val genFunction = generator.generateFunction(
       ruleDescription,
-      classOf[FlatJoinFunction[Any, Any, Any]],
+      classOf[FlatJoinFunction[Row, Row, Row]],
       body,
       returnType)
 
-    val joinFun = new FlatJoinRunner[Any, Any, Any](
+    val joinFun = new FlatJoinRunner[Row, Row, Row](
       genFunction.name,
       genFunction.code,
       genFunction.returnType)
 
     val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
 
-    joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
-      .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
+    joinOperator
+      .where(leftKeys.toArray: _*)
+      .equalTo(rightKeys.toArray: _*)
+      .`with`(joinFun)
+      .name(joinOpName)
   }
 
   private def joinSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
index 672ff9c..9ba65bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.runtime.MinusCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -86,55 +85,21 @@ class DataSetMinus(
     rowCnt
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
-    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val coGroupedDs = leftDataSet.coGroup(rightDataSet)
 
     val coGroupOpName = s"minus: ($minusSelectionToString)"
-    val coGroupFunction = new MinusCoGroupFunction[Any](all)
-
-    val minusDs = coGroupedDs.where("*").equalTo("*")
-      .`with`(coGroupFunction).name(coGroupOpName)
-
-    val config = tableEnv.getConfig
-    val leftType = leftDataSet.getType
-
-    // here we only care about left type information, because we emit records from left DataSet
-    expectedType match {
-      case None if config.getEfficientTypeUsage =>
-        minusDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != leftType) {
-          val mapFunc = getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = leftType,
-            expectedType = determinedType,
-            conversionOperatorName = "DataSetMinusConversion",
-            fieldNames = getRowType.getFieldNames)
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          minusDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          minusDs
-        }
-    }
+    val coGroupFunction = new MinusCoGroupFunction[Row](all)
+
+    coGroupedDs
+      .where("*")
+      .equalTo("*")
+      .`with`(coGroupFunction)
+      .name(coGroupOpName)
   }
 
   private def minusSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
index 02138cf..980f3cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,26 +19,19 @@
 package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.plan.nodes.FlinkRel
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.types.Row
 
 trait DataSetRel extends RelNode with FlinkRel {
 
   /**
     * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
     *
-    * @param tableEnv     [[BatchTableEnvironment]] of the translated Table.
-    * @param expectedType specifies the type the Flink operator should return. The type must
-    *                     have the same arity as the result. For instance, if the
-    *                     expected type is a RowTypeInfo this method will return a DataSet of
-    *                     type Row. If the expected type is Tuple2, the operator will return
-    *                     a Tuple2 if possible. Row otherwise.
-    * @return DataSet of type expectedType or RowTypeInfo
+    * @param tableEnv The [[BatchTableEnvironment]] of the translated Table.
+    * @return DataSet of type [[Row]]
     */
-  def translateToPlan(
-     tableEnv: BatchTableEnvironment,
-     expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
+  def translateToPlan(tableEnv: BatchTableEnvironment) : DataSet[Row]
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
index 48bbb74..44d2d00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.plan.schema.DataSetTable
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with DataSource.
@@ -51,14 +51,12 @@ class DataSetScan(
     )
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
     val inputDataSet: DataSet[Any] = dataSetTable.dataSet
 
-    convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
+    convertToInternalRow(inputDataSet, dataSetTable, config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index a70b4ab..b7d1a4b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -26,10 +26,11 @@ import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -87,9 +88,7 @@ class DataSetSingleRowJoin(
     planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
@@ -100,8 +99,7 @@ class DataSetSingleRowJoin(
       rightDataSet.getType,
       leftIsSingle,
       joinCondition,
-      broadcastSetName,
-      expectedType)
+      broadcastSetName)
 
     val (multiRowDataSet, singleRowDataSet) =
       if (leftIsSingle) {
@@ -114,17 +112,16 @@ class DataSetSingleRowJoin(
       .flatMap(mapSideJoin)
       .withBroadcastSet(singleRowDataSet, broadcastSetName)
       .name(getMapOperatorName)
-      .asInstanceOf[DataSet[Any]]
   }
 
   private def generateMapFunction(
       config: TableConfig,
-      inputType1: TypeInformation[Any],
-      inputType2: TypeInformation[Any],
+      inputType1: TypeInformation[Row],
+      inputType2: TypeInformation[Row],
       firstIsSingle: Boolean,
       joinCondition: RexNode,
-      broadcastInputSetName: String,
-      expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
+      broadcastInputSetName: String)
+    : FlatMapFunction[Row, Row] = {
 
     val codeGenerator = new CodeGenerator(
       config,
@@ -132,11 +129,7 @@ class DataSetSingleRowJoin(
       inputType1,
       Some(inputType2))
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val conversion = codeGenerator.generateConverterResultExpression(
       returnType,
@@ -144,28 +137,29 @@ class DataSetSingleRowJoin(
 
     val condition = codeGenerator.generateExpression(joinCondition)
 
-    val joinMethodBody = s"""
-                  |${condition.code}
-                  |if (${condition.resultTerm}) {
-                  |  ${conversion.code}
-                  |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
-                  |}
-                  |""".stripMargin
+    val joinMethodBody =
+      s"""
+        |${condition.code}
+        |if (${condition.resultTerm}) {
+        |  ${conversion.code}
+        |  ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+        |}
+        |""".stripMargin
 
     val genFunction = codeGenerator.generateFunction(
       ruleDescription,
-      classOf[FlatJoinFunction[Any, Any, Any]],
+      classOf[FlatJoinFunction[Row, Row, Row]],
       joinMethodBody,
       returnType)
 
     if (firstIsSingle) {
-      new MapJoinRightRunner[Any, Any, Any](
+      new MapJoinRightRunner[Row, Row, Row](
         genFunction.name,
         genFunction.code,
         genFunction.returnType,
         broadcastInputSetName)
     } else {
-      new MapJoinLeftRunner[Any, Any, Any](
+      new MapJoinLeftRunner[Row, Row, Row](
         genFunction.name,
         genFunction.code,
         genFunction.returnType,

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index 4d84730..192237a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -27,11 +27,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -87,10 +86,7 @@ class DataSetSort(
     }
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]] = None)
-    : DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     if (fieldCollations.isEmpty) {
       throw TableException("Limiting the result without sorting is not allowed " +
@@ -113,10 +109,10 @@ class DataSetSort(
       partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
     }
 
-    val limitedDs = if (offset == null && fetch == null) {
+    if (offset == null && fetch == null) {
       partitionedDs
     } else {
-      val countFunction = new CountPartitionFunction[Any]
+      val countFunction = new CountPartitionFunction[Row]
 
       val partitionCountName = s"prepare offset/fetch"
 
@@ -126,7 +122,7 @@ class DataSetSort(
 
       val broadcastName = "countPartition"
 
-      val limitFunction = new LimitFilterFunction[Any](
+      val limitFunction = new LimitFilterFunction[Row](
         limitStart,
         limitEnd,
         broadcastName)
@@ -138,41 +134,6 @@ class DataSetSort(
         .name(limitName)
         .withBroadcastSet(partitionCount, broadcastName)
     }
-
-    val inputType = partitionedDs.getType
-    expectedType match {
-
-      case None if config.getEfficientTypeUsage =>
-        limitedDs
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
-
-        // conversion
-        if (determinedType != inputType) {
-
-          val mapFunc = getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = partitionedDs.getType,
-            expectedType = determinedType,
-            conversionOperatorName = "DataSetSortConversion",
-            fieldNames = getRowType.getFieldNames.asScala
-          )
-
-          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          limitedDs.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          limitedDs
-        }
-    }
   }
 
   private def directionToOrder(direction: Direction) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
index b0c95b5..a87c6e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -77,24 +77,12 @@ class DataSetUnion(
     getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
-    var leftDataSet: DataSet[Any] = null
-    var rightDataSet: DataSet[Any] = null
-
-    expectedType match {
-      case None =>
-        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-        rightDataSet =
-          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
-      case _ =>
-        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
-    }
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+    leftDataSet.union(rightDataSet)
   }
 
   private def unionSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index e0282f2..3ebee2c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -66,17 +66,11 @@ class DataSetValues(
     super.explainTerms(pw).item("values", valuesFieldsToString)
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val generator = new CodeGenerator(config)
 
@@ -94,12 +88,12 @@ class DataSetValues(
       generatedRecords.map(_.code),
       returnType)
 
-    val inputFormat = new ValuesInputFormat[Any](
+    val inputFormat = new ValuesInputFormat[Row](
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)
 
-    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+    tableEnv.execEnv.createInput(inputFormat, returnType)
   }
 
   private def valuesFieldsToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index b165afa..48de822 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -23,21 +23,17 @@ import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConversions._
-
 /**
   * Flink RelNode which matches along with a LogicalWindowAggregate.
   */
@@ -52,7 +48,7 @@ class DataSetWindowAggregate(
     inputType: RelDataType,
     grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
+  with CommonAggregate
   with DataSetRel {
 
   override def deriveRowType() = rowRelDataType
@@ -109,20 +105,15 @@ class DataSetWindowAggregate(
     planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
   }
 
-  override def translateToPlan(
-    tableEnv: BatchTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
-    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     // whether identifiers are matched case-sensitively
     val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
-    val result = window match {
+    window match {
       case EventTimeTumblingGroupWindow(_, _, size) =>
         createEventTimeTumblingWindowDataSet(
           inputDS,
@@ -139,31 +130,14 @@ class DataSetWindowAggregate(
             "windows in a batch environment must declare a time attribute over which " +
             "the query is evaluated.")
     }
-
-    // if the expected type is not a Row, inject a mapper to convert to the expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${getRowType.getFieldNames.toList.mkString(", ")})"
-        result.map(
-          getConversionMapper(
-            config = config,
-            nullableInput = false,
-            inputType = resultRowTypeInfo.asInstanceOf[TypeInformation[Any]],
-            expectedType = expectedType.get,
-            conversionOperatorName = "DataSetWindowAggregateConversion",
-            fieldNames = getRowType.getFieldNames
-          ))
-          .name(mapName)
-      case _ => result
-    }
   }
 
 
   private def createEventTimeTumblingWindowDataSet(
-      inputDS: DataSet[Any],
+      inputDS: DataSet[Row],
       isTimeWindow: Boolean,
       isParserCaseSensitive: Boolean)
-    : DataSet[Any] = {
+    : DataSet[Row] = {
     val mapFunction = createDataSetWindowPrepareMapFunction(
       window,
       namedAggregates,
@@ -182,6 +156,8 @@ class DataSetWindowAggregate(
       .map(mapFunction)
       .name(prepareOperatorName)
 
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
     val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
     if (isTimeWindow) {
       // grouped time window aggregation
@@ -190,9 +166,8 @@ class DataSetWindowAggregate(
       mappedInput.asInstanceOf[DataSet[Row]]
         .groupBy(groupingKeys: _*)
         .reduceGroup(groupReduceFunction)
-        .returns(resultRowTypeInfo)
+        .returns(rowTypeInfo)
         .name(aggregateOperatorName)
-        .asInstanceOf[DataSet[Any]]
     } else {
       // count window
       val groupingKeys = grouping.indices.toArray
@@ -203,10 +178,8 @@ class DataSetWindowAggregate(
           // sort on time field, it's the last element in the row
           .sortGroup(mapReturnType.getArity - 1, Order.ASCENDING)
           .reduceGroup(groupReduceFunction)
-          .returns(resultRowTypeInfo)
+          .returns(rowTypeInfo)
           .name(aggregateOperatorName)
-          .asInstanceOf[DataSet[Any]]
-
       } else {
         // TODO: count tumbling all window on event-time should sort all the data set
         // on event time before applying the windowing logic.
@@ -217,11 +190,12 @@ class DataSetWindowAggregate(
   }
 
   private[this] def createEventTimeSessionWindowDataSet(
-    inputDS: DataSet[Any],
-    isParserCaseSensitive: Boolean): DataSet[Any] = {
+      inputDS: DataSet[Row],
+      isParserCaseSensitive: Boolean)
+    : DataSet[Row] = {
 
     val groupingKeys = grouping.indices.toArray
-    val rowTypeInfo = resultRowTypeInfo
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     // grouping window
     if (groupingKeys.length > 0) {
@@ -280,7 +254,6 @@ class DataSetWindowAggregate(
           .reduceGroup(groupReduceFunction)
           .returns(rowTypeInfo)
           .name(aggregateOperatorName)
-          .asInstanceOf[DataSet[Any]]
       }
       // do non-incremental aggregation
       else {
@@ -298,7 +271,6 @@ class DataSetWindowAggregate(
         .reduceGroup(groupReduceFunction)
         .returns(rowTypeInfo)
         .name(aggregateOperatorName)
-        .asInstanceOf[DataSet[Any]]
       }
     }
     // non-grouping window
@@ -332,12 +304,4 @@ class DataSetWindowAggregate(
       s"window: ($window), select: ($aggString)"
     }
   }
-
-  private def resultRowTypeInfo: RowTypeInfo = {
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-      .toArray
-    new RowTypeInfo(fieldTypes: _*)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index 6a3d4e3..c21d008 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -22,27 +22,23 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
-
-import scala.collection.JavaConverters._
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.types.Row
 
 class DataStreamAggregate(
     window: LogicalWindow,
@@ -55,7 +51,7 @@ class DataStreamAggregate(
     inputType: RelDataType,
     grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
+  with CommonAggregate
   with DataStreamRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -103,24 +99,12 @@ class DataStreamAggregate(
         namedProperties))
   }
 
-  override def translateToPlan(
-    tableEnv: StreamTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
-    val config = tableEnv.getConfig
     val groupingKeys = grouping.indices.toArray
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] =
-      getRowType.getFieldList.asScala
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-      .toArray
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val aggString = aggregationToString(
       inputType,
@@ -142,121 +126,100 @@ class DataStreamAggregate(
 
     val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
 
-    val result: DataStream[Any] = {
-      // check whether all aggregates support partial aggregate
-      if (AggregateUtil.doAllSupportPartialAggregation(
-            namedAggregates.map(_.getKey),
-            inputType,
-            grouping.length)) {
-        // do Incremental Aggregation
-        val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+
+    // check whether all aggregates support partial aggregate
+    if (AggregateUtil.doAllSupportPartialAggregation(
+          namedAggregates.map(_.getKey),
+          inputType,
+          grouping.length)) {
+      // do Incremental Aggregation
+      val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+        namedAggregates,
+        inputType,
+        getRowType,
+        grouping)
+      // grouped / keyed aggregation
+      if (groupingKeys.length > 0) {
+        val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
+          window,
           namedAggregates,
           inputType,
-          getRowType,
-          grouping)
-        // grouped / keyed aggregation
-        if (groupingKeys.length > 0) {
-          val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-          val windowedStream =
-            createKeyedWindowedStream(window, keyedStream)
-            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-          windowedStream
-          .apply(reduceFunction, windowFunction)
-          .returns(rowTypeInfo)
-          .name(keyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-        // global / non-keyed aggregation
-        else {
-          val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val windowedStream =
-            createNonKeyedWindowedStream(window, mappedInput)
-            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-          windowedStream
-          .apply(reduceFunction, windowFunction)
-          .returns(rowTypeInfo)
-          .name(nonKeyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+        val windowedStream =
+          createKeyedWindowedStream(window, keyedStream)
+          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+        windowedStream
+        .reduce(reduceFunction, windowFunction)
+        .returns(rowTypeInfo)
+        .name(keyedAggOpName)
       }
+      // global / non-keyed aggregation
       else {
-        // do non-Incremental Aggregation
-        // grouped / keyed aggregation
-        if (groupingKeys.length > 0) {
-
-          val windowFunction = AggregateUtil.createWindowAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-          val windowedStream =
-            createKeyedWindowedStream(window, keyedStream)
-            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-          windowedStream
-          .apply(windowFunction)
-          .returns(rowTypeInfo)
-          .name(keyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
-        // global / non-keyed aggregation
-        else {
-          val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
-            window,
-            namedAggregates,
-            inputType,
-            rowRelDataType,
-            grouping,
-            namedProperties)
-
-          val windowedStream =
-            createNonKeyedWindowedStream(window, mappedInput)
-            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-          windowedStream
-          .apply(windowFunction)
-          .returns(rowTypeInfo)
-          .name(nonKeyedAggOpName)
-          .asInstanceOf[DataStream[Any]]
-        }
+        val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val windowedStream =
+          createNonKeyedWindowedStream(window, mappedInput)
+          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+        windowedStream
+        .reduce(reduceFunction, windowFunction)
+        .returns(rowTypeInfo)
+        .name(nonKeyedAggOpName)
       }
     }
+    else {
+      // do non-Incremental Aggregation
+      // grouped / keyed aggregation
+      if (groupingKeys.length > 0) {
+
+        val windowFunction = AggregateUtil.createWindowAggregationFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+        val windowedStream =
+          createKeyedWindowedStream(window, keyedStream)
+          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
 
-    // if the expected type is not a Row, inject a mapper to convert to the expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-        result.map(getConversionMapper(
-          config = config,
-          nullableInput = false,
-          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
-          expectedType = expectedType.get,
-          conversionOperatorName = "DataStreamAggregateConversion",
-          fieldNames = getRowType.getFieldNames.asScala
-        ))
-          .name(mapName)
-      case _ => result
+        windowedStream
+        .apply(windowFunction)
+        .returns(rowTypeInfo)
+        .name(keyedAggOpName)
+      }
+      // global / non-keyed aggregation
+      else {
+        val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
+          window,
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping,
+          namedProperties)
+
+        val windowedStream =
+          createNonKeyedWindowedStream(window, mappedInput)
+          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+        windowedStream
+        .apply(windowFunction)
+        .returns(rowTypeInfo)
+        .name(nonKeyedAggOpName)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 43f1fb6..b39ae4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -22,13 +22,13 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter._
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with FlatMapOperator.
@@ -42,7 +42,7 @@ class DataStreamCalc(
     private[flink] val calcProgram: RexProgram,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with FlinkCalc
+  with CommonCalc
   with DataStreamRel {
 
   override def deriveRowType() = rowRelDataType
@@ -68,20 +68,12 @@ class DataStreamCalc(
         calcProgram.getCondition != null)
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
 
     val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
     val generator = new CodeGenerator(config, false, inputDataStream.getType)
 
     val body = functionBody(
@@ -89,14 +81,13 @@ class DataStreamCalc(
       inputDataStream.getType,
       getRowType,
       calcProgram,
-      config,
-      expectedType)
+      config)
 
     val genFunction = generator.generateFunction(
       ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
+      classOf[FlatMapFunction[Row, Row]],
       body,
-      returnType)
+      FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val mapFunc = calcMapFunction(genFunction)
     inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index bd65954..dd799e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -24,11 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with join a user defined table function.
@@ -44,7 +44,7 @@ class DataStreamCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkCorrelate
+  with CommonCorrelate
   with DataStreamRel {
 
   override def deriveRowType() = relRowType
@@ -79,10 +79,7 @@ class DataStreamCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
 
@@ -103,7 +100,6 @@ class DataStreamCorrelate(
       joinType,
       rexCall,
       condition,
-      expectedType,
       Some(pojoFieldMapping),
       ruleDescription)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 16427b8..6f20831 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.nodes.FlinkRel
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.FlinkRel
+import org.apache.flink.types.Row
 
 trait DataStreamRel extends RelNode with FlinkRel {
 
@@ -30,16 +30,9 @@ trait DataStreamRel extends RelNode with FlinkRel {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
-    * @param expectedType specifies the type the Flink operator should return. The type must
-    *                     have the same arity as the result. For instance, if the
-    *                     expected type is a RowTypeInfo this method will return a DataSet of
-    *                     type Row. If the expected type is Tuple2, the operator will return
-    *                     a Tuple2 if possible. Row otherwise.
-    * @return DataStream of type expectedType or RowTypeInfo
+    * @return DataStream of type [[Row]]
     */
-  def translateToPlan(
-    tableEnv: StreamTableEnvironment,
-    expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
+  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
 
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 2d5ec09..e8d218e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.schema.DataStreamTable
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.types.Row
 
 /**
   * Flink RelNode which matches along with DataStreamSource.
@@ -51,14 +51,12 @@ class DataStreamScan(
     )
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
 
-    convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
+    convertToInternalRow(inputDataStream, dataStreamTable, config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index beb15d2..f676176 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -60,9 +60,7 @@ class DataStreamUnion(
     s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index f2a3d72..0ab4a48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -57,18 +57,11 @@ class DataStreamValues(
     )
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = determineReturnType(
-      getRowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
     val generator = new CodeGenerator(config)
 
@@ -86,12 +79,12 @@ class DataStreamValues(
       generatedRecords.map(_.code),
       returnType)
 
-    val inputFormat = new ValuesInputFormat[Any](
+    val inputFormat = new ValuesInputFormat[Row](
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)
 
-    tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+    tableEnv.execEnv.createInput(inputFormat, returnType)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index ddac958..56f7f27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -19,17 +19,13 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -39,69 +35,37 @@ abstract class StreamScan(
     traitSet: RelTraitSet,
     table: RelOptTable)
   extends TableScan(cluster, traitSet, table)
+  with CommonScan
   with DataStreamRel {
 
-  protected def convertToExpectedType(
+  protected def convertToInternalRow(
       input: DataStream[Any],
       flinkTable: FlinkTable[_],
-      expectedType: Option[TypeInformation[Any]],
-      config: TableConfig): DataStream[Any] = {
+      config: TableConfig)
+    : DataStream[Row] = {
 
     val inputType = input.getType
 
-    expectedType match {
-
-      // special case:
-      // if efficient type usage is enabled and no expected type is set
-      // we can simply forward the DataSet to the next operator.
-      // however, we cannot forward PojoTypes as their fields don't have an order
-      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
-        input
-
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
+    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-        // conversion
-        if (determinedType != inputType) {
-          val generator = new CodeGenerator(
-            config,
-            nullableInput = false,
-            input.getType,
-            flinkTable.fieldIndexes)
+    // conversion
+    if (needsConversion(inputType, internalType)) {
 
-          val conversion = generator.generateConverterResultExpression(
-            determinedType,
-            getRowType.getFieldNames)
+      val mapFunc = getConversionMapper(
+        config,
+        inputType,
+        internalType,
+        "DataStreamSourceConversion",
+        getRowType.getFieldNames,
+        Some(flinkTable.fieldIndexes))
 
-          val body =
-            s"""
-               |${conversion.code}
-               |return ${conversion.resultTerm};
-               |""".stripMargin
+      val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-          val genFunction = generator.generateFunction(
-            "DataSetSourceConversion",
-            classOf[MapFunction[Any, Any]],
-            body,
-            determinedType)
-
-          val mapFunc = new MapRunner[Any, Any](
-            genFunction.name,
-            genFunction.code,
-            genFunction.returnType)
-
-          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          input.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          input
-        }
+      input.map(mapFunc).name(opName)
+    }
+    // no conversion necessary, forward
+    else {
+      input.asInstanceOf[DataStream[Row]]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 7550593..73d0291 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
@@ -62,15 +62,13 @@ class StreamTableSourceScan(
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
   }
 
-  override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
 
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = tableSource
       .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
 
-    convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
+    convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index e89f14f..034ff9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -67,9 +67,10 @@ object AggregateUtil {
     *
     */
   private[flink] def createPrepareMapFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    groupings: Array[Int],
-    inputType: RelDataType): MapFunction[Any, Row] = {
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      groupings: Array[Int],
+      inputType: RelDataType)
+    : MapFunction[Row, Row] = {
 
     val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -83,7 +84,7 @@ object AggregateUtil {
       aggregates,
       aggFieldIndexes,
       groupings,
-      mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+      mapReturnType)
 
     mapFunction
   }
@@ -113,11 +114,12 @@ object AggregateUtil {
     * NOTE: this function is only used for time based window on batch tables.
     */
   def createDataSetWindowPrepareMapFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    groupings: Array[Int],
-    inputType: RelDataType,
-    isParserCaseSensitive: Boolean): MapFunction[Any, Row] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      groupings: Array[Int],
+      inputType: RelDataType,
+      isParserCaseSensitive: Boolean)
+    : MapFunction[Row, Row] = {
 
     val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -147,7 +149,7 @@ object AggregateUtil {
       groupings,
       timeFieldPos,
       tumbleTimeWindowSize,
-      mapReturnType).asInstanceOf[MapFunction[Any, Row]]
+      mapReturnType)
   }
 
   /**
@@ -159,13 +161,14 @@ object AggregateUtil {
     * NOTE: this function is only used for window on batch tables.
     */
   def createDataSetWindowAggregationGroupReduceFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty],
-    isInputCombined: Boolean = false): RichGroupReduceFunction[Row, Row] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty],
+      isInputCombined: Boolean = false)
+    : RichGroupReduceFunction[Row, Row] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -269,10 +272,11 @@ object AggregateUtil {
     *
     */
   private[flink] def createDataSetWindowAggregationCombineFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    groupings: Array[Int]): RichGroupCombineFunction[Row,Row] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      groupings: Array[Int])
+    : RichGroupCombineFunction[Row,Row] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -313,11 +317,12 @@ object AggregateUtil {
     *
     */
   private[flink] def createAggregateGroupReduceFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    inGroupingSet: Boolean): RichGroupReduceFunction[Row, Row] = {
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      inGroupingSet: Boolean)
+    : RichGroupReduceFunction[Row, Row] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),
@@ -370,10 +375,11 @@ object AggregateUtil {
     *
     */
   private[flink] def createIncrementalAggregateReduceFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int])
+    : IncrementalAggregateReduceFunction = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -397,13 +403,13 @@ object AggregateUtil {
     * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
     */
   private[flink] def createAllWindowAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty])
-  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : AllWindowFunction[Row, Row, DataStreamWindow] = {
 
     val aggFunction =
       createAggregateGroupReduceFunction(
@@ -427,13 +433,13 @@ object AggregateUtil {
     *
     */
   private[flink] def createWindowAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty])
-  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
 
     val aggFunction =
       createAggregateGroupReduceFunction(
@@ -457,12 +463,13 @@ object AggregateUtil {
     * window aggregates.
     */
   private[flink] def createAllWindowIncrementalAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : AllWindowFunction[Row, Row, DataStreamWindow] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -499,12 +506,13 @@ object AggregateUtil {
     * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
     */
   private[flink] def createWindowIncrementalAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+      window: LogicalWindow,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      outputType: RelDataType,
+      groupings: Array[Int],
+      properties: Seq[NamedWindowProperty])
+    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
 
     val aggregates = transformToAggregateFunctions(
       namedAggregates.map(_.getKey),inputType,groupings.length)._2

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
deleted file mode 100644
index a2a120b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkTypeFactory
-
-import scala.collection.JavaConversions._
-
-object TypeConverter {
-
-  val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
-
-  /**
-    * Determines the return type of Flink operators based on the logical fields, the expected
-    * physical type and configuration parameters.
-    *
-    * For example:
-    *   - No physical type expected, only 3 non-null fields and efficient type usage enabled
-    *       -> return Tuple3
-    *   - No physical type expected, efficient type usage enabled, but 3 nullable fields
-    *       -> return Row because Tuple does not support null values
-    *   - Physical type expected
-    *       -> check if physical type is compatible and return it
-    *
-    * @param logicalRowType logical row information
-    * @param expectedPhysicalType expected physical type
-    * @param nullable fields can be nullable
-    * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
-    * @return suitable return type
-    */
-  def determineReturnType(
-      logicalRowType: RelDataType,
-      expectedPhysicalType: Option[TypeInformation[Any]],
-      nullable: Boolean,
-      useEfficientTypes: Boolean)
-    : TypeInformation[Any] = {
-    // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
-      FlinkTypeFactory.toTypeInfo(relDataType.getType)
-    }
-    // field names
-    val logicalFieldNames = logicalRowType.getFieldNames.toList
-
-    val returnType = expectedPhysicalType match {
-      // a certain physical type is expected (but not Row)
-      // check if expected physical type is compatible with logical field type
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        if (typeInfo.getArity != logicalFieldTypes.length) {
-          throw new TableException("Arity of result does not match expected type.")
-        }
-        typeInfo match {
-
-          // POJO type expected
-          case pt: PojoTypeInfo[_] =>
-            logicalFieldNames.zip(logicalFieldTypes) foreach {
-              case (fName, fType) =>
-                val pojoIdx = pt.getFieldIndex(fName)
-                if (pojoIdx < 0) {
-                  throw new TableException(s"POJO does not define field name: $fName")
-                }
-                val expectedTypeInfo = pt.getTypeAt(pojoIdx)
-                if (fType != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fType")
-                }
-            }
-
-          // Tuple/Case class type expected
-          case ct: CompositeType[_] =>
-            logicalFieldTypes.zipWithIndex foreach {
-              case (fieldTypeInfo, i) =>
-                val expectedTypeInfo = ct.getTypeAt(i)
-                if (fieldTypeInfo != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
-                }
-            }
-
-          // Atomic type expected
-          case at: AtomicType[_] =>
-            val fieldTypeInfo = logicalFieldTypes.head
-            if (fieldTypeInfo != at) {
-              throw new TableException(s"Result field does not match expected type. " +
-                s"Expected: $at; Actual: $fieldTypeInfo")
-            }
-
-          case _ =>
-            throw new TableException("Unsupported result type.")
-        }
-        typeInfo
-
-      // Row is expected, create the arity for it
-      case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
-        new RowTypeInfo(logicalFieldTypes: _*)
-
-      // no physical type
-      // determine type based on logical fields and configuration parameters
-      case None =>
-        // no need for efficient types -> use Row
-        // we cannot use efficient types if row arity > tuple arity or nullable
-        if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
-          new RowTypeInfo(logicalFieldTypes: _*)
-        }
-        // use efficient type tuple or atomic type
-        else {
-          if (logicalFieldTypes.length == 1) {
-            logicalFieldTypes.head
-          }
-          else {
-            new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
-          }
-        }
-    }
-    returnType.asInstanceOf[TypeInformation[Any]]
-  }
-
-  def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
-    case INNER => JoinType.INNER
-    case LEFT => JoinType.LEFT_OUTER
-    case RIGHT => JoinType.RIGHT_OUTER
-    case FULL => JoinType.FULL_OUTER
-  }
-
-  def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
-    case JoinType.INNER => JoinRelType.INNER
-    case JoinType.LEFT_OUTER => JoinRelType.LEFT
-    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
-    case JoinType.FULL_OUTER => JoinRelType.FULL
-  }
-}


[3/3] flink git commit: [FLINK-5662] [table] Rework internal type handling of Table API

Posted by tw...@apache.org.
[FLINK-5662] [table] Rework internal type handling of Table API

This closes #3271.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6b225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6b225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6b225

Branch: refs/heads/master
Commit: 6bc6b225e55095eb8797db2903b0546410e7fdd9
Parents: 1ce10c8
Author: twalthr <tw...@apache.org>
Authored: Mon Feb 6 17:18:08 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Mon Feb 13 17:50:00 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  34 ++-
 .../table/api/StreamTableEnvironment.scala      |  41 ++--
 .../apache/flink/table/api/TableConfig.scala    |  24 --
 .../flink/table/api/TableEnvironment.scala      | 130 +++++++++-
 .../flink/table/calcite/FlinkTypeFactory.scala  |  17 +-
 .../flink/table/codegen/CodeGenerator.scala     |  33 +--
 .../flink/table/codegen/ExpressionReducer.scala |  11 +-
 .../apache/flink/table/codegen/generated.scala  |  25 +-
 .../flink/table/plan/logical/operators.scala    |  18 +-
 .../table/plan/nodes/CommonAggregate.scala      |  69 ++++++
 .../flink/table/plan/nodes/CommonCalc.scala     | 152 ++++++++++++
 .../table/plan/nodes/CommonCorrelate.scala      | 229 ++++++++++++++++++
 .../flink/table/plan/nodes/CommonScan.scala     |  82 +++++++
 .../flink/table/plan/nodes/FlinkAggregate.scala |  69 ------
 .../flink/table/plan/nodes/FlinkCalc.scala      | 172 -------------
 .../flink/table/plan/nodes/FlinkCorrelate.scala | 233 ------------------
 .../flink/table/plan/nodes/FlinkRel.scala       |  37 ---
 .../table/plan/nodes/dataset/BatchScan.scala    |  61 ++---
 .../nodes/dataset/BatchTableSourceScan.scala    |   7 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |  82 ++-----
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  27 +--
 .../plan/nodes/dataset/DataSetCorrelate.scala   |  12 +-
 .../plan/nodes/dataset/DataSetIntersect.scala   |  57 +----
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  32 ++-
 .../table/plan/nodes/dataset/DataSetMinus.scala |  57 +----
 .../table/plan/nodes/dataset/DataSetRel.scala   |  17 +-
 .../table/plan/nodes/dataset/DataSetScan.scala  |   8 +-
 .../nodes/dataset/DataSetSingleRowJoin.scala    |  48 ++--
 .../table/plan/nodes/dataset/DataSetSort.scala  |  49 +---
 .../table/plan/nodes/dataset/DataSetUnion.scala |  24 +-
 .../plan/nodes/dataset/DataSetValues.scala      |  18 +-
 .../nodes/dataset/DataSetWindowAggregate.scala  |  68 ++----
 .../nodes/datastream/DataStreamAggregate.scala  | 239 ++++++++-----------
 .../plan/nodes/datastream/DataStreamCalc.scala  |  27 +--
 .../nodes/datastream/DataStreamCorrelate.scala  |  14 +-
 .../plan/nodes/datastream/DataStreamRel.scala   |  15 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |  10 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   6 +-
 .../nodes/datastream/DataStreamValues.scala     |  23 +-
 .../plan/nodes/datastream/StreamScan.scala      |  84 ++-----
 .../datastream/StreamTableSourceScan.scala      |  12 +-
 .../table/runtime/aggregate/AggregateUtil.scala | 120 +++++-----
 .../flink/table/typeutils/TypeConverter.scala   | 156 ------------
 .../api/java/batch/TableEnvironmentITCase.java  |   7 +-
 .../scala/batch/TableEnvironmentITCase.scala    |   4 +-
 .../batch/utils/TableProgramsTestBase.scala     |  11 +-
 .../expressions/utils/ExpressionTestBase.scala  |  16 +-
 47 files changed, 1184 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index dd0487a..2dec00e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -23,11 +23,12 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.Expression
@@ -135,7 +136,7 @@ abstract class BatchTableEnvironment(
   private[flink] def explain(table: Table, extended: Boolean): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
+    val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new GenericTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
@@ -250,28 +251,39 @@ abstract class BatchTableEnvironment(
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
-    val dataSetPlan = optimize(table.getRelNode)
-    translate(dataSetPlan)
+    val relNode = table.getRelNode
+    val dataSetPlan = optimize(relNode)
+    translate(dataSetPlan, relNode.getRowType)
   }
 
   /**
-    * Translates a logical [[RelNode]] into a [[DataSet]].
+    * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary.
     *
     * @param logicalPlan The root node of the relational expression tree.
+    * @param logicalType The row type of the result. Since the logicalPlan can lose the
+    *                    field naming during optimization we pass the row type separately.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+  protected def translate[A](
+      logicalPlan: RelNode,
+      logicalType: RelDataType)
+      (implicit tpe: TypeInformation[A]): DataSet[A] = {
     TableEnvironment.validateType(tpe)
 
     logicalPlan match {
       case node: DataSetRel =>
-        node.translateToPlan(
-          this,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[DataSet[A]]
-      case _ => ???
+        val plan = node.translateToPlan(this)
+        val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion")
+        conversion match {
+          case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
+          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+        }
+
+      case _ =>
+        throw TableException("Cannot generate DataSet due to an invalid logical plan. " +
+          "This is a bug and should not happen. Please file an issue.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 81e884d..19c4af1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -23,10 +23,11 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.explain.PlanJsonParser
@@ -200,11 +201,11 @@ abstract class StreamTableEnvironment(
     dataStream: DataStream[T],
     fields: Array[Expression]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields)
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
-      fieldIndexes.toArray,
-      fieldNames.toArray
+      fieldIndexes,
+      fieldNames
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -255,30 +256,40 @@ abstract class StreamTableEnvironment(
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-    val dataStreamPlan = optimize(table.getRelNode)
-    translate(dataStreamPlan)
+    val relNode = table.getRelNode
+    val dataStreamPlan = optimize(relNode)
+    translate(dataStreamPlan, relNode.getRowType)
   }
 
   /**
     * Translates a logical [[RelNode]] into a [[DataStream]].
     *
     * @param logicalPlan The root node of the relational expression tree.
+    * @param logicalType The row type of the result. Since the logicalPlan can lose the
+    *                    field naming during optimization we pass the row type separately.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A]
-      (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+  protected def translate[A](
+      logicalPlan: RelNode,
+      logicalType: RelDataType)
+      (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
     TableEnvironment.validateType(tpe)
 
     logicalPlan match {
       case node: DataStreamRel =>
-        node.translateToPlan(
-          this,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[DataStream[A]]
-      case _ => ???
+        val plan = node.translateToPlan(this)
+        val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
+        conversion match {
+          case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary
+          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+        }
+
+      case _ =>
+        throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
+          "This is a bug and should not happen. Please file an issue.")
     }
   }
 
@@ -291,7 +302,9 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val dataStream = translate[Row](
+      optimizedPlan,
+      ast.getRowType)(new GenericTypeInfo(classOf[Row]))
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index a8876a8..6448657 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,12 +37,6 @@ class TableConfig {
   private var nullCheck: Boolean = true
 
   /**
-    * Defines if efficient types (such as Tuple types or Atomic types)
-    * should be used within operators where possible.
-    */
-  private var efficientTypeUsage = false
-
-  /**
     * Defines the configuration of Calcite for Table API and SQL queries.
     */
   private var calciteConfig = CalciteConfig.DEFAULT
@@ -73,24 +67,6 @@ class TableConfig {
   }
 
   /**
-    * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types
-    * or Atomic types) are used within operators where possible.
-    *
-    * NOTE: Currently, this is an experimental feature.
-    */
-  def getEfficientTypeUsage = efficientTypeUsage
-
-  /**
-    * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types
-    * or Atomic types) are used within operators where possible.
-    *
-    * NOTE: Currently, this is an experimental feature.
-    */
-  def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
-    this.efficientTypeUsage = efficientTypeUsage
-  }
-
-  /**
     * Returns the current configuration of Calcite for Table API and SQL queries.
     */
   def getCalciteConfig: CalciteConfig = calciteConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bcff1fb..b36441a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,27 +31,30 @@ import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
-import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.codegen.ExpressionReducer
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
 import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.table.plan.schema.{RelTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.RelTable
+import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
 
@@ -410,7 +413,7 @@ abstract class TableEnvironment(val config: TableConfig) {
         }
         exprs.map {
           case UnresolvedFieldReference(name) => (0, name)
-          case _ => throw new TableException("Field reference expression expected.")
+          case _ => throw new TableException("Field reference expression requested.")
         }
       case t: TupleTypeInfo[A] =>
         exprs.zipWithIndex.map {
@@ -466,6 +469,123 @@ abstract class TableEnvironment(val config: TableConfig) {
     (fieldNames.toArray, fieldIndexes.toArray)
   }
 
+  /**
+    * Creates a final converter that maps the internal row type to external type.
+    *
+    * @param physicalRowTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has to be a
+    *                     valid Java class identifier.
+    */
+  protected def sinkConversion[T](
+      physicalRowTypeInfo: TypeInformation[Row],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[T],
+      functionName: String)
+    : Option[MapFunction[Row, T]] = {
+
+    // validate that at least the field types of physical and logical type match
+    // we do that here to make sure that plan translation was correct
+    val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
+    if (physicalRowTypeInfo != logicalRowTypeInfo) {
+      throw TableException("The field types of physical and logical row types do not match." +
+        "This is a bug and should not happen. Please file an issue.")
+    }
+
+    // requested type is a generic Row, no conversion needed
+    if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
+          requestedTypeInfo.getTypeClass == classOf[Row]) {
+      return None
+    }
+
+    // convert to type information
+    val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
+      FlinkTypeFactory.toTypeInfo(relDataType.getType)
+    }
+    // field names
+    val logicalFieldNames = logicalRowType.getFieldNames.asScala
+
+    // validate requested type
+    if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
+      throw new TableException("Arity of result does not match requested type.")
+    }
+    requestedTypeInfo match {
+
+      // POJO type requested
+      case pt: PojoTypeInfo[_] =>
+        logicalFieldNames.zip(logicalFieldTypes) foreach {
+          case (fName, fType) =>
+            val pojoIdx = pt.getFieldIndex(fName)
+            if (pojoIdx < 0) {
+              throw new TableException(s"POJO does not define field name: $fName")
+            }
+            val requestedTypeInfo = pt.getTypeAt(pojoIdx)
+            if (fType != requestedTypeInfo) {
+              throw new TableException(s"Result field does not match requested type. " +
+                s"requested: $requestedTypeInfo; Actual: $fType")
+            }
+        }
+
+      // Tuple/Case class/Row type requested
+      case tt: TupleTypeInfoBase[_] =>
+        logicalFieldTypes.zipWithIndex foreach {
+          case (fieldTypeInfo, i) =>
+            val requestedTypeInfo = tt.getTypeAt(i)
+            if (fieldTypeInfo != requestedTypeInfo) {
+              throw new TableException(s"Result field does not match requested type. " +
+                s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo")
+            }
+        }
+
+      // Atomic type requested
+      case at: AtomicType[_] =>
+        if (logicalFieldTypes.size != 1) {
+          throw new TableException(s"Requested result type is an atomic type but " +
+            s"result has more or less than a single field.")
+        }
+        val fieldTypeInfo = logicalFieldTypes.head
+        if (fieldTypeInfo != at) {
+          throw new TableException(s"Result field does not match requested type. " +
+            s"Requested: $at; Actual: $fieldTypeInfo")
+        }
+
+      case _ =>
+        throw new TableException(s"Unsupported result type: $requestedTypeInfo")
+    }
+
+    // code generate MapFunction
+    val generator = new CodeGenerator(
+      config,
+      false,
+      physicalRowTypeInfo,
+      None,
+      None)
+
+    val conversion = generator.generateConverterResultExpression(
+      requestedTypeInfo,
+      logicalFieldNames)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    val genFunction = generator.generateFunction(
+      functionName,
+      classOf[MapFunction[Row, T]],
+      body,
+      requestedTypeInfo)
+
+    val mapFunction = new MapRunner[Row, T](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+    Some(mapFunction)
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index f3e2f91..251be14 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
@@ -36,8 +36,10 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
 import org.apache.flink.table.plan.schema.ArrayRelDataType
 import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+import org.apache.flink.types.Row
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
   * Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
@@ -167,6 +169,19 @@ object FlinkTypeFactory {
         throw TableException(s"Type is not supported: $t")
   }
 
+  /**
+    * Converts a Calcite logical record into a Flink type information.
+    */
+  def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = {
+    // convert to type information
+    val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
+      FlinkTypeFactory.toTypeInfo(relDataType.getType)
+    }
+    // field names
+    val logicalFieldNames = logicalRowType.getFieldNames.asScala
+    new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
+  }
+
   def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
     case BOOLEAN => BOOLEAN_TYPE_INFO
     case TINYINT => BYTE_TYPE_INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index d49d7a0..c679bd8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -40,8 +40,8 @@ import org.apache.flink.table.codegen.calls.FunctionGenerator
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.runtime.TableFunctionCollector
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -62,8 +62,8 @@ import scala.collection.mutable
 class CodeGenerator(
    config: TableConfig,
    nullableInput: Boolean,
-   input1: TypeInformation[Any],
-   input2: Option[TypeInformation[Any]] = None,
+   input1: TypeInformation[_ <: Any],
+   input2: Option[TypeInformation[_ <: Any]] = None,
    input1PojoFieldMapping: Option[Array[Int]] = None,
    input2PojoFieldMapping: Option[Array[Int]] = None)
   extends RexVisitor[GeneratedExpression] {
@@ -112,7 +112,7 @@ class CodeGenerator(
     * @param config configuration that determines runtime behavior
     */
   def this(config: TableConfig) =
-    this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None)
+    this(config, false, new RowTypeInfo(), None, None)
 
   // set of member statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
@@ -224,15 +224,16 @@ class CodeGenerator(
     * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
     *                 output record can be accessed via the given term methods.
     * @param returnType expected return type
-    * @tparam T Flink Function to be generated.
+    * @tparam F Flink Function to be generated.
+    * @tparam T Return type of the Flink Function.
     * @return instance of GeneratedFunction
     */
-  def generateFunction[T <: Function](
+  def generateFunction[F <: Function, T <: Any](
       name: String,
-      clazz: Class[T],
+      clazz: Class[F],
       bodyCode: String,
-      returnType: TypeInformation[Any])
-    : GeneratedFunction[T] = {
+      returnType: TypeInformation[T])
+    : GeneratedFunction[F, T] = {
     val funcName = newName(name)
 
     // Janino does not support generics, that's why we need
@@ -298,14 +299,14 @@ class CodeGenerator(
     *             valid Java class identifier.
     * @param records code for creating records
     * @param returnType expected return type
-    * @tparam T Flink Function to be generated.
+    * @tparam T Return type of the Flink Function.
     * @return instance of GeneratedFunction
     */
-  def generateValuesInputFormat[T](
+  def generateValuesInputFormat[T <: Row](
       name: String,
       records: Seq[String],
-      returnType: TypeInformation[Any])
-    : GeneratedFunction[GenericInputFormat[T]] = {
+      returnType: TypeInformation[T])
+    : GeneratedInput[GenericInputFormat[T], T] = {
     val funcName = newName(name)
 
     addReusableOutRecord(returnType)
@@ -343,7 +344,7 @@ class CodeGenerator(
       }
     """.stripMargin
 
-    GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode)
+    GeneratedInput(funcName, returnType, funcCode)
   }
 
   /**
@@ -1094,7 +1095,7 @@ class CodeGenerator(
   // ----------------------------------------------------------------------------------------------
 
   private def generateInputAccess(
-      inputType: TypeInformation[Any],
+      inputType: TypeInformation[_ <: Any],
       inputTerm: String,
       index: Int,
       pojoFieldMapping: Option[Array[Int]])
@@ -1122,7 +1123,7 @@ class CodeGenerator(
   }
 
   private def generateNullableInputFieldAccess(
-      inputType: TypeInformation[Any],
+      inputType: TypeInformation[_ <: Any],
       inputTerm: String,
       index: Int,
       pojoFieldMapping: Option[Array[Int]])

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 94007de..0f1de21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -24,11 +24,10 @@ import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -39,7 +38,7 @@ import scala.collection.JavaConverters._
 class ExpressionReducer(config: TableConfig)
   extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
 
-  private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
+  private val EMPTY_ROW_INFO = new RowTypeInfo()
   private val EMPTY_ROW = new Row(0)
 
   override def reduce(
@@ -82,14 +81,14 @@ class ExpressionReducer(config: TableConfig)
       resultType.getFieldNames,
       literals)
 
-    val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
+    val generatedFunction = generator.generateFunction[MapFunction[Row, Row], Row](
       "ExpressionReducer",
       classOf[MapFunction[Row, Row]],
       s"""
         |${result.code}
         |return ${result.resultTerm};
         |""".stripMargin,
-      resultType.asInstanceOf[TypeInformation[Any]])
+      resultType)
 
     val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
     val function = clazz.newInstance()

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
index b4c293d..271f686 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.codegen
 
+import org.apache.flink.api.common.functions
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 /**
@@ -41,14 +44,32 @@ object GeneratedExpression {
 }
 
 /**
-  * Describes a generated [[org.apache.flink.api.common.functions.Function]]
+  * Describes a generated [[functions.Function]]
   *
   * @param name class name of the generated Function.
   * @param returnType the type information of the result type
   * @param code code of the generated Function.
+  * @tparam F type of function
   * @tparam T type of function
   */
-case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)
+case class GeneratedFunction[F <: Function, T <: Any](
+  name: String,
+  returnType: TypeInformation[T],
+  code: String)
+
+/**
+  * Describes a generated [[InputFormat]].
+  *
+  * @param name class name of the generated input function.
+  * @param returnType the type information of the result type
+  * @param code code of the generated Function.
+  * @tparam F type of function
+  * @tparam T type of function
+  */
+case class GeneratedInput[F <: InputFormat[_, _], T <: Any](
+  name: String,
+  returnType: TypeInformation[T],
+  code: String)
 
 /**
   * Describes a generated [[org.apache.flink.util.Collector]].

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 3ba0285..20f810a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -22,14 +22,13 @@ import java.util
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.CorrelationId
-import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableFunctionScan}
+import org.apache.calcite.rel.core.{CorrelationId, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
 import org.apache.calcite.rex.{RexInputRef, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table._
 import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment, UnresolvedException}
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.expressions._
@@ -37,7 +36,6 @@ import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
 
 import scala.collection.JavaConverters._
@@ -426,11 +424,18 @@ case class Join(
     }
 
     relBuilder.join(
-      TypeConverter.flinkJoinTypeToRelType(joinType),
+      convertJoinType(joinType),
       condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
       corSet.asJava)
   }
 
+  private def convertJoinType(joinType: JoinType) = joinType match {
+    case JoinType.INNER => JoinRelType.INNER
+    case JoinType.LEFT_OUTER => JoinRelType.LEFT
+    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
+    case JoinType.FULL_OUTER => JoinRelType.FULL
+  }
+
   private def ambiguousName: Set[String] =
     left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
 
@@ -481,13 +486,12 @@ case class Join(
         if (checkIfFilterCondition(x)) {
           localPredicateFound = true
         }
-      case x: BinaryComparison => {
+      case x: BinaryComparison =>
         if (checkIfFilterCondition(x)) {
           localPredicateFound = true
         } else {
           nonEquiJoinPredicateFound = true
         }
-      }
       case x => failValidation(
         s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
new file mode 100644
index 0000000..3883b14
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+
+import scala.collection.JavaConverters._
+
+trait CommonAggregate {
+
+  private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
+
+    val inFields = inputType.getFieldNames.asScala
+    grouping.map( inFields(_) ).mkString(", ")
+  }
+
+  private[flink] def aggregationToString(
+      inputType: RelDataType,
+      grouping: Array[Int],
+      rowType: RelDataType,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      namedProperties: Seq[NamedWindowProperty])
+    : String = {
+
+    val inFields = inputType.getFieldNames.asScala
+    val outFields = rowType.getFieldNames.asScala
+
+    val groupStrings = grouping.map( inFields(_) )
+
+    val aggs = namedAggregates.map(_.getKey)
+    val aggStrings = aggs.map( a => s"${a.getAggregation}(${
+      if (a.getArgList.size() > 0) {
+        inFields(a.getArgList.get(0))
+      } else {
+        "*"
+      }
+    })")
+
+    val propStrings = namedProperties.map(_.property.toString)
+
+    (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
+      case (f, o) => if (f == o) {
+        f
+      } else {
+        s"$f AS $o"
+      }
+    }.mkString(", ")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
new file mode 100644
index 0000000..3f46258
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.runtime.FlatMapRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait CommonCalc {
+
+  private[flink] def functionBody(
+      generator: CodeGenerator,
+      inputType: TypeInformation[Row],
+      rowType: RelDataType,
+      calcProgram: RexProgram,
+      config: TableConfig)
+    : String = {
+
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+    val condition = calcProgram.getCondition
+    val expandedExpressions = calcProgram.getProjectList.map(
+      expr => calcProgram.expandLocalRef(expr))
+    val projection = generator.generateResultExpression(
+      returnType,
+      rowType.getFieldNames,
+      expandedExpressions)
+
+    // only projection
+    if (condition == null) {
+      s"""
+        |${projection.code}
+        |${generator.collectorTerm}.collect(${projection.resultTerm});
+        |""".stripMargin
+    }
+    else {
+      val filterCondition = generator.generateExpression(
+        calcProgram.expandLocalRef(calcProgram.getCondition))
+      // only filter
+      if (projection == null) {
+        s"""
+          |${filterCondition.code}
+          |if (${filterCondition.resultTerm}) {
+          |  ${generator.collectorTerm}.collect(${generator.input1Term});
+          |}
+          |""".stripMargin
+      }
+      // both filter and projection
+      else {
+        s"""
+          |${filterCondition.code}
+          |if (${filterCondition.resultTerm}) {
+          |  ${projection.code}
+          |  ${generator.collectorTerm}.collect(${projection.resultTerm});
+          |}
+          |""".stripMargin
+      }
+    }
+  }
+
+  private[flink] def calcMapFunction(
+      genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
+    : RichFlatMapFunction[Row, Row] = {
+
+    new FlatMapRunner[Row, Row](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+  }
+
+  private[flink] def conditionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+    val cond = calcProgram.getCondition
+    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val localExprs = calcProgram.getExprList.asScala.toList
+
+    if (cond != null) {
+      expression(cond, inFields, Some(localExprs))
+    } else {
+      ""
+    }
+  }
+
+  private[flink] def selectionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+    val proj = calcProgram.getProjectList.asScala.toList
+    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val localExprs = calcProgram.getExprList.asScala.toList
+    val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
+
+    proj
+      .map(expression(_, inFields, Some(localExprs)))
+      .zip(outFields).map { case (e, o) =>
+        if (e != o) {
+          e + " AS " + o
+        } else {
+          e
+        }
+    }.mkString(", ")
+  }
+
+  private[flink] def calcOpName(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+    val conditionStr = conditionToString(calcProgram, expression)
+    val selectionStr = selectionToString(calcProgram, expression)
+
+    s"${if (calcProgram.getCondition != null) {
+      s"where: ($conditionStr), "
+    } else {
+      ""
+    }}select: ($selectionStr)"
+  }
+
+  private[flink] def calcToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+    val name = calcOpName(calcProgram, expression)
+    s"Calc($name)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
new file mode 100644
index 0000000..61b7ffb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * Join a user-defined table function
+  */
+trait CommonCorrelate {
+
+  /**
+    * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
+    * and user-defined table function.
+    */
+  private[flink] def correlateMapFunction(
+      config: TableConfig,
+      inputTypeInfo: TypeInformation[Row],
+      udtfTypeInfo: TypeInformation[Any],
+      rowType: RelDataType,
+      joinType: SemiJoinType,
+      rexCall: RexCall,
+      condition: Option[RexNode],
+      pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
+      ruleDescription: String)
+    : CorrelateFlatMapRunner[Row, Row] = {
+
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+    val flatMap = generateFunction(
+      config,
+      inputTypeInfo,
+      udtfTypeInfo,
+      returnType,
+      rowType,
+      joinType,
+      rexCall,
+      pojoFieldMapping,
+      ruleDescription)
+
+    val collector = generateCollector(
+      config,
+      inputTypeInfo,
+      udtfTypeInfo,
+      returnType,
+      rowType,
+      condition,
+      pojoFieldMapping)
+
+    new CorrelateFlatMapRunner[Row, Row](
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      flatMap.returnType)
+
+  }
+
+  /**
+    * Generates the flat map function to run the user-defined table function.
+    */
+  private def generateFunction(
+      config: TableConfig,
+      inputTypeInfo: TypeInformation[Row],
+      udtfTypeInfo: TypeInformation[Any],
+      returnType: TypeInformation[Row],
+      rowType: RelDataType,
+      joinType: SemiJoinType,
+      rexCall: RexCall,
+      pojoFieldMapping: Option[Array[Int]],
+      ruleDescription: String)
+    : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+
+    val functionGenerator = new CodeGenerator(
+      config,
+      false,
+      inputTypeInfo,
+      Some(udtfTypeInfo),
+      None,
+      pojoFieldMapping)
+
+    val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs
+
+    val collectorTerm = functionGenerator
+      .addReusableConstructor(classOf[TableFunctionCollector[_]])
+      .head
+
+    val call = functionGenerator.generateExpression(rexCall)
+    var body =
+      s"""
+        |${call.resultTerm}.setCollector($collectorTerm);
+        |${call.code}
+        |""".stripMargin
+
+    if (joinType == SemiJoinType.LEFT) {
+      // left outer join
+
+      // in case of left outer join and the returned row of table function is empty,
+      // fill all fields of row with null
+      val input2NullExprs = input2AccessExprs.map { x =>
+        GeneratedExpression(
+          primitiveDefaultValue(x.resultType),
+          ALWAYS_NULL,
+          NO_CODE,
+          x.resultType)
+      }
+      val outerResultExpr = functionGenerator.generateResultExpression(
+        input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
+      body +=
+        s"""
+          |boolean hasOutput = $collectorTerm.isCollected();
+          |if (!hasOutput) {
+          |  ${outerResultExpr.code}
+          |  ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+          |}
+          |""".stripMargin
+    } else if (joinType != SemiJoinType.INNER) {
+      throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+    }
+
+    functionGenerator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Row, Row]],
+      body,
+      returnType)
+  }
+
+  /**
+    * Generates table function collector.
+    */
+  private[flink] def generateCollector(
+      config: TableConfig,
+      inputTypeInfo: TypeInformation[Row],
+      udtfTypeInfo: TypeInformation[Any],
+      returnType: TypeInformation[Row],
+      rowType: RelDataType,
+      condition: Option[RexNode],
+      pojoFieldMapping: Option[Array[Int]])
+    : GeneratedCollector = {
+
+    val generator = new CodeGenerator(
+      config,
+      false,
+      inputTypeInfo,
+      Some(udtfTypeInfo),
+      None,
+      pojoFieldMapping)
+
+    val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
+
+    val crossResultExpr = generator.generateResultExpression(
+      input1AccessExprs ++ input2AccessExprs,
+      returnType,
+      rowType.getFieldNames.asScala)
+
+    val collectorCode = if (condition.isEmpty) {
+      s"""
+        |${crossResultExpr.code}
+        |getCollector().collect(${crossResultExpr.resultTerm});
+        |""".stripMargin
+    } else {
+      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
+      filterGenerator.input1Term = filterGenerator.input2Term
+      val filterCondition = filterGenerator.generateExpression(condition.get)
+      s"""
+        |${filterGenerator.reuseInputUnboxingCode()}
+        |${filterCondition.code}
+        |if (${filterCondition.resultTerm}) {
+        |  ${crossResultExpr.code}
+        |  getCollector().collect(${crossResultExpr.resultTerm});
+        |}
+        |""".stripMargin
+    }
+
+    generator.generateTableFunctionCollector(
+      "TableFunctionCollector",
+      collectorCode,
+      udtfTypeInfo)
+  }
+
+  private[flink] def selectToString(rowType: RelDataType): String = {
+    rowType.getFieldNames.asScala.mkString(",")
+  }
+
+  private[flink] def correlateOpName(
+      rexCall: RexCall,
+      sqlFunction: TableSqlFunction,
+      rowType: RelDataType)
+    : String = {
+
+    s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
+  }
+
+  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
+    val udtfName = sqlFunction.getName
+    val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
+    s"table($udtfName($operands))"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
new file mode 100644
index 0000000..274b602
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.types.Row
+
+/**
+  * Common class for batch and stream scans.
+  */
+trait CommonScan {
+
+  /**
+    * We check if the input type is exactly the same as the internal row type.
+    * A conversion is necessary if types differ.
+    */
+  private[flink] def needsConversion(
+      externalTypeInfo: TypeInformation[Any],
+      internalTypeInfo: TypeInformation[Row])
+    : Boolean = {
+
+    externalTypeInfo != internalTypeInfo
+  }
+
+  private[flink] def getConversionMapper(
+      config: TableConfig,
+      inputType: TypeInformation[Any],
+      expectedType: TypeInformation[Row],
+      conversionOperatorName: String,
+      fieldNames: Seq[String],
+      inputPojoFieldMapping: Option[Array[Int]] = None)
+    : MapFunction[Any, Row] = {
+
+    val generator = new CodeGenerator(
+      config,
+      false,
+      inputType,
+      None,
+      inputPojoFieldMapping)
+    val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    val genFunction = generator.generateFunction(
+      conversionOperatorName,
+      classOf[MapFunction[Any, Row]],
+      body,
+      expectedType)
+
+    new MapRunner[Any, Row](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
deleted file mode 100644
index 7290594..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.flink.table.calcite.FlinkRelBuilder
-import FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-
-import scala.collection.JavaConverters._
-
-trait FlinkAggregate {
-
-  private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
-
-    val inFields = inputType.getFieldNames.asScala
-    grouping.map( inFields(_) ).mkString(", ")
-  }
-
-  private[flink] def aggregationToString(
-      inputType: RelDataType,
-      grouping: Array[Int],
-      rowType: RelDataType,
-      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      namedProperties: Seq[NamedWindowProperty])
-    : String = {
-
-    val inFields = inputType.getFieldNames.asScala
-    val outFields = rowType.getFieldNames.asScala
-
-    val groupStrings = grouping.map( inFields(_) )
-
-    val aggs = namedAggregates.map(_.getKey)
-    val aggStrings = aggs.map( a => s"${a.getAggregation}(${
-      if (a.getArgList.size() > 0) {
-        inFields(a.getArgList.get(0))
-      } else {
-        "*"
-      }
-    })")
-
-    val propStrings = namedProperties.map(_.property.toString)
-
-    (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
-      case (f, o) => if (f == o) {
-        f
-      } else {
-        s"$f AS $o"
-      }
-    }.mkString(", ")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
deleted file mode 100644
index 5ebd3ee..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexNode, RexProgram}
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.table.runtime.FlatMapRunner
-import org.apache.flink.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-trait FlinkCalc {
-
-  private[flink] def functionBody(
-    generator: CodeGenerator,
-    inputType: TypeInformation[Any],
-    rowType: RelDataType,
-    calcProgram: RexProgram,
-    config: TableConfig,
-    expectedType: Option[TypeInformation[Any]]): String = {
-
-    val returnType = determineReturnType(
-      rowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val condition = calcProgram.getCondition
-    val expandedExpressions = calcProgram.getProjectList.map(
-      expr => calcProgram.expandLocalRef(expr))
-    val projection = generator.generateResultExpression(
-      returnType,
-      rowType.getFieldNames,
-      expandedExpressions)
-
-      // only projection
-      if (condition == null) {
-        s"""
-          |${projection.code}
-          |${generator.collectorTerm}.collect(${projection.resultTerm});
-          |""".stripMargin
-      }
-      else {
-        val filterCondition = generator.generateExpression(
-          calcProgram.expandLocalRef(calcProgram.getCondition))
-        // only filter
-        if (projection == null) {
-          // conversion
-          if (inputType != returnType) {
-            val conversion = generator.generateConverterResultExpression(
-              returnType,
-              rowType.getFieldNames)
-
-            s"""
-              |${filterCondition.code}
-              |if (${filterCondition.resultTerm}) {
-              |  ${conversion.code}
-              |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-              |}
-              |""".stripMargin
-          }
-          // no conversion
-          else {
-            s"""
-              |${filterCondition.code}
-              |if (${filterCondition.resultTerm}) {
-              |  ${generator.collectorTerm}.collect(${generator.input1Term});
-              |}
-              |""".stripMargin
-          }
-        }
-        // both filter and projection
-        else {
-          s"""
-            |${filterCondition.code}
-            |if (${filterCondition.resultTerm}) {
-            |  ${projection.code}
-            |  ${generator.collectorTerm}.collect(${projection.resultTerm});
-            |}
-            |""".stripMargin
-        }
-      }
-    }
-
-  private[flink] def calcMapFunction(
-      genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = {
-
-    new FlatMapRunner[Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-  }
-
-  private[flink] def conditionToString(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
-    val cond = calcProgram.getCondition
-    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
-    val localExprs = calcProgram.getExprList.asScala.toList
-
-    if (cond != null) {
-      expression(cond, inFields, Some(localExprs))
-    } else {
-      ""
-    }
-  }
-
-  private[flink] def selectionToString(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
-    val proj = calcProgram.getProjectList.asScala.toList
-    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
-    val localExprs = calcProgram.getExprList.asScala.toList
-    val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
-
-    proj
-      .map(expression(_, inFields, Some(localExprs)))
-      .zip(outFields).map { case (e, o) => {
-      if (e != o) {
-        e + " AS " + o
-      } else {
-        e
-      }
-    }
-    }.mkString(", ")
-  }
-
-  private[flink] def calcOpName(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
-    val conditionStr = conditionToString(calcProgram, expression)
-    val selectionStr = selectionToString(calcProgram, expression)
-
-    s"${if (calcProgram.getCondition != null) {
-      s"where: ($conditionStr), "
-    } else {
-      ""
-    }}select: ($selectionStr)"
-  }
-
-  private[flink] def calcToString(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
-    val name = calcOpName(calcProgram, expression)
-    s"Calc($name)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
deleted file mode 100644
index c986602..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
-import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
-import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
-import org.apache.flink.table.typeutils.TypeConverter._
-import org.apache.flink.table.api.{TableConfig, TableException}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Join a user-defined table function
-  */
-trait FlinkCorrelate {
-
-  /**
-    * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
-    * and user-defined table function.
-    */
-  private[flink] def correlateMapFunction(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Any],
-      udtfTypeInfo: TypeInformation[Any],
-      rowType: RelDataType,
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      condition: Option[RexNode],
-      expectedType: Option[TypeInformation[Any]],
-      pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
-      ruleDescription: String)
-    : CorrelateFlatMapRunner[Any, Any] = {
-
-    val returnType = determineReturnType(
-      rowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val flatMap = generateFunction(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      joinType,
-      rexCall,
-      pojoFieldMapping,
-      ruleDescription)
-
-    val collector = generateCollector(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      condition,
-      pojoFieldMapping)
-
-    new CorrelateFlatMapRunner[Any, Any](
-      flatMap.name,
-      flatMap.code,
-      collector.name,
-      collector.code,
-      flatMap.returnType)
-
-  }
-
-  /**
-    * Generates the flat map function to run the user-defined table function.
-    */
-  private def generateFunction(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Any],
-      udtfTypeInfo: TypeInformation[Any],
-      returnType: TypeInformation[Any],
-      rowType: RelDataType,
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      pojoFieldMapping: Option[Array[Int]],
-      ruleDescription: String)
-    : GeneratedFunction[FlatMapFunction[Any, Any]] = {
-
-    val functionGenerator = new CodeGenerator(
-      config,
-      false,
-      inputTypeInfo,
-      Some(udtfTypeInfo),
-      None,
-      pojoFieldMapping)
-
-    val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs
-
-    val collectorTerm = functionGenerator
-      .addReusableConstructor(classOf[TableFunctionCollector[_]])
-      .head
-
-    val call = functionGenerator.generateExpression(rexCall)
-    var body =
-      s"""
-        |${call.resultTerm}.setCollector($collectorTerm);
-        |${call.code}
-        |""".stripMargin
-
-    if (joinType == SemiJoinType.LEFT) {
-      // left outer join
-
-      // in case of left outer join and the returned row of table function is empty,
-      // fill all fields of row with null
-      val input2NullExprs = input2AccessExprs.map { x =>
-        GeneratedExpression(
-          primitiveDefaultValue(x.resultType),
-          ALWAYS_NULL,
-          NO_CODE,
-          x.resultType)
-      }
-      val outerResultExpr = functionGenerator.generateResultExpression(
-        input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
-      body +=
-        s"""
-          |boolean hasOutput = $collectorTerm.isCollected();
-          |if (!hasOutput) {
-          |  ${outerResultExpr.code}
-          |  ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
-          |}
-          |""".stripMargin
-    } else if (joinType != SemiJoinType.INNER) {
-      throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
-    }
-
-    functionGenerator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
-      body,
-      returnType)
-  }
-
-  /**
-    * Generates table function collector.
-    */
-  private[flink] def generateCollector(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Any],
-      udtfTypeInfo: TypeInformation[Any],
-      returnType: TypeInformation[Any],
-      rowType: RelDataType,
-      condition: Option[RexNode],
-      pojoFieldMapping: Option[Array[Int]])
-    : GeneratedCollector = {
-
-    val generator = new CodeGenerator(
-      config,
-      false,
-      inputTypeInfo,
-      Some(udtfTypeInfo),
-      None,
-      pojoFieldMapping)
-
-    val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
-
-    val crossResultExpr = generator.generateResultExpression(
-      input1AccessExprs ++ input2AccessExprs,
-      returnType,
-      rowType.getFieldNames.asScala)
-
-    val collectorCode = if (condition.isEmpty) {
-      s"""
-        |${crossResultExpr.code}
-        |getCollector().collect(${crossResultExpr.resultTerm});
-        |""".stripMargin
-    } else {
-      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
-      filterGenerator.input1Term = filterGenerator.input2Term
-      val filterCondition = filterGenerator.generateExpression(condition.get)
-      s"""
-        |${filterGenerator.reuseInputUnboxingCode()}
-        |${filterCondition.code}
-        |if (${filterCondition.resultTerm}) {
-        |  ${crossResultExpr.code}
-        |  getCollector().collect(${crossResultExpr.resultTerm});
-        |}
-        |""".stripMargin
-    }
-
-    generator.generateTableFunctionCollector(
-      "TableFunctionCollector",
-      collectorCode,
-      udtfTypeInfo)
-  }
-
-  private[flink] def selectToString(rowType: RelDataType): String = {
-    rowType.getFieldNames.asScala.mkString(",")
-  }
-
-  private[flink] def correlateOpName(
-      rexCall: RexCall,
-      sqlFunction: TableSqlFunction,
-      rowType: RelDataType)
-    : String = {
-
-    s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
-  }
-
-  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
-    val udtfName = sqlFunction.getName
-    val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
-    s"table($udtfName($operands))"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index a7765d1..7ad9bd5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -66,43 +66,6 @@ trait FlinkRel {
     }
   }
 
-  private[flink] def getConversionMapper(
-      config: TableConfig,
-      nullableInput: Boolean,
-      inputType: TypeInformation[Any],
-      expectedType: TypeInformation[Any],
-      conversionOperatorName: String,
-      fieldNames: Seq[String],
-      inputPojoFieldMapping: Option[Array[Int]] = None)
-    : MapFunction[Any, Any] = {
-
-    val generator = new CodeGenerator(
-      config,
-      nullableInput,
-      inputType,
-      None,
-      inputPojoFieldMapping)
-    val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
-
-    val body =
-      s"""
-         |${conversion.code}
-         |return ${conversion.resultTerm};
-         |""".stripMargin
-
-    val genFunction = generator.generateFunction(
-      conversionOperatorName,
-      classOf[MapFunction[Any, Any]],
-      body,
-      expectedType)
-
-    new MapRunner[Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-  }
-
   private[flink] def estimateRowSize(rowType: RelDataType): Double = {
     val fieldList = rowType.getFieldList
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index 252bb2e..09262a6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -36,6 +36,7 @@ abstract class BatchScan(
     traitSet: RelTraitSet,
     table: RelOptTable)
   extends TableScan(cluster, traitSet, table)
+  with CommonScan
   with DataSetRel {
 
   override def toString: String = {
@@ -48,50 +49,34 @@ abstract class BatchScan(
     planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
   }
 
-  protected def convertToExpectedType(
+  protected def convertToInternalRow(
       input: DataSet[Any],
       flinkTable: FlinkTable[_],
-      expectedType: Option[TypeInformation[Any]],
-      config: TableConfig): DataSet[Any] = {
+      config: TableConfig)
+    : DataSet[Row] = {
 
     val inputType = input.getType
 
-    expectedType match {
+    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-      // special case:
-      // if efficient type usage is enabled and no expected type is set
-      // we can simply forward the DataSet to the next operator.
-      // however, we cannot forward PojoTypes as their fields don't have an order
-      case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
-        input
+    // conversion
+    if (needsConversion(inputType, internalType)) {
 
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
+      val mapFunc = getConversionMapper(
+        config,
+        inputType,
+        internalType,
+        "DataSetSourceConversion",
+        getRowType.getFieldNames,
+        Some(flinkTable.fieldIndexes))
 
-        // conversion
-        if (determinedType != inputType) {
+      val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-          val mapFunc = getConversionMapper(
-            config,
-            nullableInput = false,
-            inputType,
-            determinedType,
-            "DataSetSourceConversion",
-            getRowType.getFieldNames,
-            Some(flinkTable.fieldIndexes))
-
-          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          input.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          input
-        }
+      input.map(mapFunc).name(opName)
+    }
+    // no conversion necessary, forward
+    else {
+      input.asInstanceOf[DataSet[Row]]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 73dddc6..9b8e1ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
 class BatchTableSourceScan(
@@ -62,13 +63,11 @@ class BatchTableSourceScan(
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
     val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
 
-    convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
+    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index 6771536..206e562 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -23,19 +23,15 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.table.typeutils.TypeConverter
-import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConverters._
-
 /**
   * Flink RelNode which matches along with a LogicalAggregate.
   */
@@ -49,7 +45,7 @@ class DataSetAggregate(
     grouping: Array[Int],
     inGroupingSet: Boolean)
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
+  with CommonAggregate
   with DataSetRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -89,9 +85,7 @@ class DataSetAggregate(
     planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
   }
 
-  override def translateToPlan(
-    tableEnv: BatchTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
 
     val config = tableEnv.getConfig
 
@@ -109,15 +103,7 @@ class DataSetAggregate(
       grouping,
       inGroupingSet)
 
-    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
-    .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-    .toArray
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
     val prepareOpName = s"prepare select: ($aggString)"
@@ -125,46 +111,26 @@ class DataSetAggregate(
       .map(mapFunction)
       .name(prepareOpName)
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
-    val result = {
-      if (groupingKeys.length > 0) {
-        // grouped aggregation
-        val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
-          s"select: ($aggString)"
-
-        mappedInput.asInstanceOf[DataSet[Row]]
-          .groupBy(groupingKeys: _*)
-          .reduceGroup(groupReduceFunction)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataSet[Any]]
-      }
-      else {
-        // global aggregation
-        val aggOpName = s"select:($aggString)"
-        mappedInput.asInstanceOf[DataSet[Row]]
-          .reduceGroup(groupReduceFunction)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataSet[Any]]
-      }
-    }
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+    if (groupingKeys.length > 0) {
+      // grouped aggregation
+      val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+        s"select: ($aggString)"
 
-    // if the expected type is not a Row, inject a mapper to convert to the expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-        result.map(getConversionMapper(
-          config = config,
-          nullableInput = false,
-          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
-          expectedType = expectedType.get,
-          conversionOperatorName = "DataSetAggregateConversion",
-          fieldNames = getRowType.getFieldNames.asScala
-        ))
-        .name(mapName)
-      case _ => result
+      mappedInput.asInstanceOf[DataSet[Row]]
+        .groupBy(groupingKeys: _*)
+        .reduceGroup(groupReduceFunction)
+        .returns(rowTypeInfo)
+        .name(aggOpName)
+    }
+    else {
+      // global aggregation
+      val aggOpName = s"select:($aggString)"
+      mappedInput.asInstanceOf[DataSet[Row]]
+        .reduceGroup(groupReduceFunction)
+        .returns(rowTypeInfo)
+        .name(aggOpName)
     }
   }
 }