You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:54 UTC

[47/50] [abbrv] flink git commit: [FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.

[FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/856485be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/856485be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/856485be

Branch: refs/heads/table-retraction
Commit: 856485be3be3bb240a068019fe21da3556f8b26f
Parents: 455a3c5
Author: Hequn Cheng <ch...@gmail.com>
Authored: Tue Apr 18 16:54:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 3 11:33:07 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  63 +++-
 .../table/api/StreamTableEnvironment.scala      |  62 +++-
 .../flink/table/api/TableEnvironment.scala      |  65 ++--
 .../flink/table/plan/nodes/CommonCalc.scala     |  26 +-
 .../table/plan/nodes/CommonCorrelate.scala      |  58 +---
 .../flink/table/plan/nodes/CommonScan.scala     |  24 +-
 .../table/plan/nodes/dataset/BatchScan.scala    |  10 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  23 +-
 .../plan/nodes/dataset/DataSetCorrelate.scala   |  32 +-
 .../plan/nodes/dataset/DataSetValues.scala      |   2 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  30 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |  38 ++-
 .../datastream/DataStreamGroupAggregate.scala   |  24 +-
 .../DataStreamGroupWindowAggregate.scala        |  47 ++-
 .../datastream/DataStreamOverAggregate.scala    |  62 ++--
 .../plan/nodes/datastream/DataStreamRel.scala   |   6 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   3 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   3 +-
 .../nodes/datastream/DataStreamValues.scala     |  14 +-
 .../plan/nodes/datastream/StreamScan.scala      |  23 +-
 .../datastream/StreamTableSourceScan.scala      |   4 +-
 .../plan/nodes/logical/FlinkLogicalCalc.scala   |   2 +-
 .../datastream/DataStreamRetractionRules.scala  |  16 +-
 .../runtime/CRowCorrelateFlatMapRunner.scala    |  83 +++++
 .../flink/table/runtime/CRowFlatMapRunner.scala |  72 ++++
 .../table/runtime/CRowInputMapRunner.scala      |  57 ++++
 .../table/runtime/CRowOutputMapRunner.scala     |  60 ++++
 .../table/runtime/CRowWrappingCollector.scala   |  41 +++
 .../flink/table/runtime/FlatMapRunner.scala     |  17 +-
 .../aggregate/AggregateAggFunction.scala        |  15 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  47 +--
 ...SetSessionWindowAggReduceGroupFunction.scala |   4 +-
 ...taSetSlideWindowAggReduceGroupFunction.scala |   4 +-
 ...TumbleTimeWindowAggReduceGroupFunction.scala |   4 +-
 .../aggregate/GroupAggProcessFunction.scala     |  58 +++-
 ...rementalAggregateAllTimeWindowFunction.scala |   7 +-
 .../IncrementalAggregateAllWindowFunction.scala |  11 +-
 ...IncrementalAggregateTimeWindowFunction.scala |   7 +-
 .../IncrementalAggregateWindowFunction.scala    |  13 +-
 .../aggregate/ProcTimeBoundedRangeOver.scala    |  30 +-
 .../aggregate/ProcTimeBoundedRowsOver.scala     |  26 +-
 .../ProcTimeUnboundedNonPartitionedOver.scala   |  23 +-
 .../ProcTimeUnboundedPartitionedOver.scala      |  22 +-
 .../aggregate/RowTimeBoundedRangeOver.scala     |  32 +-
 .../aggregate/RowTimeBoundedRowsOver.scala      |  30 +-
 .../aggregate/RowTimeUnboundedOver.scala        |  50 +--
 .../aggregate/TimeWindowPropertyCollector.scala |  34 +-
 .../runtime/io/CRowValuesInputFormat.scala      |  59 ++++
 .../table/runtime/io/ValuesInputFormat.scala    |  17 +-
 .../apache/flink/table/runtime/types/CRow.scala |  55 +++
 .../table/runtime/types/CRowComparator.scala    |  83 +++++
 .../table/runtime/types/CRowSerializer.scala    |  78 +++++
 .../table/runtime/types/CRowTypeInfo.scala      |  98 ++++++
 .../apache/flink/table/sinks/CsvTableSink.scala |   2 +
 .../flink/table/TableEnvironmentTest.scala      |  72 +++-
 .../scala/batch/TableEnvironmentITCase.scala    |  20 ++
 .../api/scala/stream/RetractionITCase.scala     | 331 +++++++++++++++++++
 .../api/scala/stream/TableSinkITCase.scala      |   2 +-
 .../api/scala/stream/utils/StreamITCase.scala   |  11 +-
 ...ProcessingOverRangeProcessFunctionTest.scala | 105 +++---
 .../runtime/types/CRowComparatorTest.scala      |  61 ++++
 .../table/utils/MockTableEnvironment.scala      |   8 +
 62 files changed, 1896 insertions(+), 490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 00cf11c..34f5018 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,16 +26,20 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
@@ -128,6 +132,56 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+    * Creates a final converter that maps the internal row type to external type.
+    *
+    * @param physicalTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has to be a
+    *                     valid Java class identifier.
+    */
+  override protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    Option[MapFunction[IN, OUT]] = {
+
+    if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+      // Row to Row, no conversion needed
+      None
+    } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+      // Row to CRow, only needs to be wrapped
+      Some(
+        new RichMapFunction[Row, CRow] {
+          private var outCRow: CRow = _
+          override def open(parameters: Configuration): Unit = outCRow = new CRow(null, true)
+          override def map(value: Row): CRow = {
+            outCRow.row = value
+            outCRow
+          }
+        }.asInstanceOf[MapFunction[IN, OUT]]
+      )
+    } else {
+      // some type that is neither Row or CRow
+
+      val converterFunction = generateRowConverterFunction[OUT](
+        physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
+        logicalRowType,
+        requestedTypeInfo,
+        functionName
+      )
+
+      val mapFunction = new MapRunner[IN, OUT](
+        converterFunction.name,
+        converterFunction.code,
+        converterFunction.returnType)
+
+      Some(mapFunction)
+    }
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
     * the result of the given [[Table]].
     *
@@ -285,10 +339,15 @@ abstract class BatchTableEnvironment(
     logicalPlan match {
       case node: DataSetRel =>
         val plan = node.translateToPlan(this)
-        val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion")
+        val conversion =
+          getConversionMapper(plan.getType, logicalType, tpe, "DataSetSinkConversion")
         conversion match {
           case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
-          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+          case Some(mapFunction: MapFunction[Row, A]) =>
+            plan.map(mapFunction)
+              .returns(tpe)
+              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+              .asInstanceOf[DataSet[A]]
         }
 
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f532c5b..e2bccf3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -36,6 +37,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
+import org.apache.flink.table.runtime.CRowInputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.types.Row
@@ -169,6 +172,52 @@ abstract class StreamTableEnvironment(
     }
   }
 
+
+  /**
+    * Creates a final converter that maps the internal row type to external type.
+    *
+    * @param physicalTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has to be a
+    *                     valid Java class identifier.
+    */
+  override protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+  Option[MapFunction[IN, OUT]] = {
+
+    if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+      // CRow to CRow, no conversion needed
+      None
+    } else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+      // CRow to Row, only needs to be unwrapped
+      Some(
+        new MapFunction[CRow, Row] {
+          override def map(value: CRow): Row = value.row
+        }.asInstanceOf[MapFunction[IN, OUT]]
+      )
+    } else {
+      // Some type that is neither CRow nor Row
+
+      val converterFunction = generateRowConverterFunction[OUT](
+        physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+        logicalRowType,
+        requestedTypeInfo,
+        functionName
+      )
+
+      Some(new CRowInputMapRunner[OUT](
+        converterFunction.name,
+        converterFunction.code,
+        converterFunction.returnType)
+        .asInstanceOf[MapFunction[IN, OUT]])
+
+    }
+  }
+
   /**
     * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
     * catalog.
@@ -334,10 +383,15 @@ abstract class StreamTableEnvironment(
     logicalPlan match {
       case node: DataStreamRel =>
         val plan = node.translateToPlan(this)
-        val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
+        val conversion =
+          getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
         conversion match {
           case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary
-          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+          case Some(mapFunction: MapFunction[CRow, A]) =>
+            plan.map(mapFunction)
+              .returns(tpe)
+              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+              .asInstanceOf[DataStream[A]]
         }
 
       case _ =>
@@ -355,9 +409,9 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataStream = translate[Row](
+    val dataStream = translate[CRow](
       optimizedPlan,
-      ast.getRowType)(new GenericTypeInfo(classOf[Row]))
+      ast.getRowType)(new GenericTypeInfo(classOf[CRow]))
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bd974b0..0a37f00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction}
 import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
@@ -56,7 +56,7 @@ import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.RelTable
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRowTypeInfo
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
@@ -615,6 +615,18 @@ abstract class TableEnvironment(val config: TableConfig) {
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
+      case cr: CRowTypeInfo =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+            val idx = cr.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new TableException(s"$origName is not a field of type $cr")
+            }
+            (idx, name)
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
+        }
       case c: CaseClassTypeInfo[A] =>
         exprs.zipWithIndex.map {
           case (UnresolvedFieldReference(name), idx) => (idx, name)
@@ -660,37 +672,38 @@ abstract class TableEnvironment(val config: TableConfig) {
   /**
     * Creates a final converter that maps the internal row type to external type.
     *
-    * @param physicalRowTypeInfo the input of the sink
+    * @param physicalTypeInfo the input of the sink
     * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
     * @param requestedTypeInfo the output type of the sink
     * @param functionName name of the map function. Must not be unique but has to be a
     *                     valid Java class identifier.
     */
-  protected def sinkConversion[T](
-      physicalRowTypeInfo: TypeInformation[Row],
+  protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    Option[MapFunction[IN, OUT]]
+
+  protected def generateRowConverterFunction[OUT](
+      inputTypeInfo: TypeInformation[Row],
       logicalRowType: RelDataType,
-      requestedTypeInfo: TypeInformation[T],
-      functionName: String)
-    : Option[MapFunction[Row, T]] = {
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    GeneratedFunction[MapFunction[Row, OUT], OUT] = {
 
     // validate that at least the field types of physical and logical type match
     // we do that here to make sure that plan translation was correct
     val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
-    if (physicalRowTypeInfo != logicalRowTypeInfo) {
+    if (logicalRowTypeInfo != inputTypeInfo) {
       throw TableException("The field types of physical and logical row types do not match." +
         "This is a bug and should not happen. Please file an issue.")
     }
 
-    // requested type is a generic Row, no conversion needed
-    if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
-          requestedTypeInfo.getTypeClass == classOf[Row]) {
-      return None
-    }
-
     // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
-      FlinkTypeFactory.toTypeInfo(relDataType.getType)
-    }
+    val logicalFieldTypes = logicalRowType.getFieldList.asScala
+      .map(t => FlinkTypeFactory.toTypeInfo(t.getType))
+
     // field names
     val logicalFieldNames = logicalRowType.getFieldNames.asScala
 
@@ -698,8 +711,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
       throw new TableException("Arity of result does not match requested type.")
     }
-    requestedTypeInfo match {
 
+    requestedTypeInfo match {
       // POJO type requested
       case pt: PojoTypeInfo[_] =>
         logicalFieldNames.zip(logicalFieldTypes) foreach {
@@ -746,7 +759,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     val generator = new CodeGenerator(
       config,
       false,
-      physicalRowTypeInfo,
+      inputTypeInfo,
       None,
       None)
 
@@ -760,20 +773,12 @@ abstract class TableEnvironment(val config: TableConfig) {
          |return ${conversion.resultTerm};
          |""".stripMargin
 
-    val genFunction = generator.generateFunction(
+    generator.generateFunction(
       functionName,
-      classOf[MapFunction[Row, T]],
+      classOf[MapFunction[Row, OUT]],
       body,
       requestedTypeInfo)
-
-    val mapFunction = new MapRunner[Row, T](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-    Some(mapFunction)
   }
-
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 96a7470..bec52ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -26,21 +26,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait CommonCalc {
+trait CommonCalc[T] {
 
-  private[flink] def functionBody(
+  private[flink] def generateFunction(
       generator: CodeGenerator,
+      ruleDescription: String,
       inputType: TypeInformation[Row],
       rowType: RelDataType,
       calcProgram: RexProgram,
-      config: TableConfig)
-    : String = {
+      config: TableConfig):
+    GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
 
     val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
 
@@ -53,7 +53,7 @@ trait CommonCalc {
       expandedExpressions)
 
     // only projection
-    if (condition == null) {
+    val body = if (condition == null) {
       s"""
         |${projection.code}
         |${generator.collectorTerm}.collect(${projection.resultTerm});
@@ -82,16 +82,12 @@ trait CommonCalc {
           |""".stripMargin
       }
     }
-  }
-
-  private[flink] def calcMapFunction(
-      genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
-    : RichFlatMapFunction[Row, Row] = {
 
-    new FlatMapRunner[Row, Row](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
+    generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Row, Row]],
+      body,
+      returnType)
   }
 
   private[flink] def conditionToString(

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 6c4066b..83a68c0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -23,12 +23,11 @@ import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
 import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
 import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
+import org.apache.flink.table.runtime.TableFunctionCollector
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -36,59 +35,12 @@ import scala.collection.JavaConverters._
 /**
   * Join a user-defined table function
   */
-trait CommonCorrelate {
-
-  /**
-    * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
-    * and user-defined table function.
-    */
-  private[flink] def correlateMapFunction(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Row],
-      udtfTypeInfo: TypeInformation[Any],
-      rowType: RelDataType,
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      condition: Option[RexNode],
-      pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
-      ruleDescription: String)
-    : CorrelateFlatMapRunner[Row, Row] = {
-
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
-
-    val flatMap = generateFunction(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      joinType,
-      rexCall,
-      pojoFieldMapping,
-      ruleDescription)
-
-    val collector = generateCollector(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      condition,
-      pojoFieldMapping)
-
-    new CorrelateFlatMapRunner[Row, Row](
-      flatMap.name,
-      flatMap.code,
-      collector.name,
-      collector.code,
-      flatMap.returnType)
-
-  }
+trait CommonCorrelate[T] {
 
   /**
     * Generates the flat map function to run the user-defined table function.
     */
-  private def generateFunction(
+  private[flink] def generateFunction(
       config: TableConfig,
       inputTypeInfo: TypeInformation[Row],
       udtfTypeInfo: TypeInformation[Any],
@@ -97,8 +49,8 @@ trait CommonCorrelate {
       joinType: SemiJoinType,
       rexCall: RexCall,
       pojoFieldMapping: Option[Array[Int]],
-      ruleDescription: String)
-    : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+      ruleDescription: String):
+    GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
 
     val functionGenerator = new CodeGenerator(
       config,

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 0a0d204..5c44525 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -21,14 +21,13 @@ package org.apache.flink.table.plan.nodes
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.types.Row
 
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan {
+trait CommonScan[T] {
 
   /**
     * We check if the input type is exactly the same as the internal row type.
@@ -36,20 +35,17 @@ trait CommonScan {
     */
   private[flink] def needsConversion(
       externalTypeInfo: TypeInformation[Any],
-      internalTypeInfo: TypeInformation[Row])
-    : Boolean = {
-
+      internalTypeInfo: TypeInformation[T]): Boolean =
     externalTypeInfo != internalTypeInfo
-  }
 
-  private[flink] def getConversionMapper(
+  private[flink] def generateConversionFunction(
       config: TableConfig,
       inputType: TypeInformation[Any],
       expectedType: TypeInformation[Row],
       conversionOperatorName: String,
       fieldNames: Seq[String],
-      inputPojoFieldMapping: Option[Array[Int]] = None)
-    : MapFunction[Any, Row] = {
+      inputPojoFieldMapping: Option[Array[Int]] = None):
+    GeneratedFunction[MapFunction[Any, Row], Row] = {
 
     val generator = new CodeGenerator(
       config,
@@ -65,17 +61,11 @@ trait CommonScan {
          |return ${conversion.resultTerm};
          |""".stripMargin
 
-    val genFunction = generator.generateFunction(
+    generator.generateFunction(
       conversionOperatorName,
       classOf[MapFunction[Any, Row]],
       body,
       expectedType)
-
-    new MapRunner[Any, Row](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index b39b8ed..64286f0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -23,12 +23,13 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait BatchScan extends CommonScan with DataSetRel {
+trait BatchScan extends CommonScan[Row] with DataSetRel {
 
   protected def convertToInternalRow(
       input: DataSet[Any],
@@ -43,7 +44,7 @@ trait BatchScan extends CommonScan with DataSetRel {
     // conversion
     if (needsConversion(inputType, internalType)) {
 
-      val mapFunc = getConversionMapper(
+      val function = generateConversionFunction(
         config,
         inputType,
         internalType,
@@ -51,6 +52,11 @@ trait BatchScan extends CommonScan with DataSetRel {
         getRowType.getFieldNames,
         Some(flinkTable.fieldIndexes))
 
+      val mapFunc = new MapRunner[Any, Row](
+        function.name,
+        function.code,
+        function.returnType)
+
       val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
       input.map(mapFunc).name(opName)

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index e05b5a8..359f5a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
 /**
@@ -44,7 +47,7 @@ class DataSetCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc
+  with CommonCalc[Row]
   with DataSetRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -83,24 +86,22 @@ class DataSetCalc(
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val returnType = FlinkTypeFactory
+      .toInternalRowTypeInfo(getRowType)
+      .asInstanceOf[RowTypeInfo]
 
     val generator = new CodeGenerator(config, false, inputDS.getType)
 
-    val body = functionBody(
+    val genFunction = generateFunction(
       generator,
+      ruleDescription,
       inputDS.getType,
       getRowType,
       calcProgram,
       config)
 
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
-      body,
-      returnType)
+    val mapFunc = new FlatMapRunner(genFunction.name, genFunction.code, returnType)
 
-    val mapFunc = calcMapFunction(genFunction)
     inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 2a62e21..1ac6f6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -26,9 +26,11 @@ import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.CorrelateFlatMapRunner
 import org.apache.flink.types.Row
 
 /**
@@ -45,7 +47,7 @@ class DataSetCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with CommonCorrelate
+  with CommonCorrelate[Row]
   with DataSetRel {
 
   override def deriveRowType() = relRowType
@@ -95,20 +97,38 @@ class DataSetCorrelate(
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val mapFunc = correlateMapFunction(
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+    val flatMap = generateFunction(
       config,
       inputDS.getType,
       udtfTypeInfo,
-      getRowType,
+      returnType,
+      rowType,
       joinType,
       rexCall,
-      condition,
-      Some(pojoFieldMapping),
+      pojoFieldMapping,
       ruleDescription)
 
+    val collector = generateCollector(
+      config,
+      inputDS.getType,
+      udtfTypeInfo,
+      returnType,
+      rowType,
+      condition,
+      pojoFieldMapping)
+
+    val mapFunc = new CorrelateFlatMapRunner[Row, Row](
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      flatMap.returnType)
+
     inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index 3ebee2c..948dd27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -88,7 +88,7 @@ class DataSetValues(
       generatedRecords.map(_.code),
       returnType)
 
-    val inputFormat = new ValuesInputFormat[Row](
+    val inputFormat = new ValuesInputFormat(
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index b015a1d..0bf723d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.runtime.CRowFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 
 /**
@@ -44,7 +47,7 @@ class DataStreamCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc
+  with CommonCalc[CRow]
   with DataStreamRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -75,28 +78,29 @@ class DataStreamCalc(
     estimateRowCount(calcProgram, rowCnt)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
+    val outputRowType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-    val generator = new CodeGenerator(config, false, inputDataStream.getType)
+    val generator = new CodeGenerator(config, false, inputRowType)
 
-    val body = functionBody(
+    val genFunction = generateFunction(
       generator,
-      inputDataStream.getType,
+      ruleDescription,
+      inputRowType,
       getRowType,
       calcProgram,
       config)
 
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
-      body,
-      FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
+    val mapFunc = new CRowFlatMapRunner(
+      genFunction.name,
+      genFunction.code,
+      CRowTypeInfo(outputRowType))
 
-    val mapFunc = calcMapFunction(genFunction)
     inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 7680904..dff5099 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -25,10 +25,12 @@ import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   * Flink RelNode which matches along with join a user defined table function.
@@ -44,7 +46,7 @@ class DataStreamCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with CommonCorrelate
+  with CommonCorrelate[CRow]
   with DataStreamRel {
 
   override def deriveRowType() = relRowType
@@ -79,30 +81,50 @@ class DataStreamCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     // we do not need to specify input type
     val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
 
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val mapFunc = correlateMapFunction(
+    val rowType = inputType.rowType
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(relRowType)
+
+    val flatMap = generateFunction(
       config,
-      inputDS.getType,
+      rowType,
       udtfTypeInfo,
+      returnType,
       getRowType,
       joinType,
       rexCall,
-      condition,
-      Some(pojoFieldMapping),
+      pojoFieldMapping,
       ruleDescription)
 
+    val collector = generateCollector(
+      config,
+      rowType,
+      udtfTypeInfo,
+      returnType,
+      getRowType,
+      condition,
+      pojoFieldMapping)
+
+    val mapFunc = new CRowCorrelateFlatMapRunner(
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      CRowTypeInfo(flatMap.returnType))
+
     inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index d88c72b..14ae343 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -28,8 +28,9 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.types.Row
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   *
@@ -91,11 +92,10 @@ class DataStreamGroupAggregate(
       .item("select", aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val outRowType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val generator = new CodeGenerator(
       tableEnv.getConfig,
@@ -117,28 +117,30 @@ class DataStreamGroupAggregate(
       generator,
       namedAggregates,
       inputType,
-      groupings)
+      groupings,
+      DataStreamRetractionRules.isAccRetract(this),
+      DataStreamRetractionRules.isAccRetract(getInput))
 
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // grouped / keyed aggregation
       if (groupings.nonEmpty) {
         inputDS
         .keyBy(groupings: _*)
         .process(processFunction)
-        .returns(rowTypeInfo)
+        .returns(outRowType)
         .name(keyedAggOpName)
-        .asInstanceOf[DataStream[Row]]
+        .asInstanceOf[DataStream[CRow]]
       }
       // global / non-keyed aggregation
       else {
         inputDS
-        .keyBy(new NullByteKeySelector[Row])
+        .keyBy(new NullByteKeySelector[CRow])
         .process(processFunction)
         .setParallelism(1)
         .setMaxParallelism(1)
-        .returns(rowTypeInfo)
+        .returns(outRowType)
         .name(nonKeyedAggOpName)
-        .asInstanceOf[DataStream[Row]]
+        .asInstanceOf[DataStream[CRow]]
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 8959b23..d503792 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream,
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
@@ -35,11 +35,12 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-import org.apache.flink.types.Row
 
 class DataStreamGroupWindowAggregate(
     window: LogicalWindow,
@@ -103,11 +104,20 @@ class DataStreamGroupWindowAggregate(
           namedProperties))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+    if (consumeRetraction) {
+      throw new TableException(
+        "Retraction on windowed GroupBy aggregation is not supported yet. " +
+          "Note: Windowed GroupBy aggregation should not follow a " +
+          "non-windowed GroupBy aggregation.")
+    }
+
+    val outRowType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val aggString = aggregationToString(
       inputType,
@@ -138,17 +148,19 @@ class DataStreamGroupWindowAggregate(
       val keyedStream = inputDS.keyBy(grouping: _*)
       val windowedStream =
         createKeyedWindowedStream(window, keyedStream)
-          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+          .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
           namedAggregates,
           inputType,
-          rowRelDataType)
+          rowRelDataType,
+          grouping
+        )
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
         .name(keyedAggOpName)
     }
     // global / non-keyed aggregation
@@ -160,17 +172,19 @@ class DataStreamGroupWindowAggregate(
 
       val windowedStream =
         createNonKeyedWindowedStream(window, inputDS)
-          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+          .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
           namedAggregates,
           inputType,
-          rowRelDataType)
+          rowRelDataType,
+          Array[Int]()
+        )
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
         .name(nonKeyedAggOpName)
     }
   }
@@ -178,9 +192,10 @@ class DataStreamGroupWindowAggregate(
 
 object DataStreamGroupWindowAggregate {
 
-
-  private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
-    : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+  private def createKeyedWindowedStream(
+      groupWindow: LogicalWindow,
+      stream: KeyedStream[CRow, Tuple]):
+    WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
 
     case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
       stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
@@ -221,8 +236,10 @@ object DataStreamGroupWindowAggregate {
       stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
   }
 
-  private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
-    : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+  private def createNonKeyedWindowedStream(
+      groupWindow: LogicalWindow,
+      stream: DataStream[CRow]):
+    AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match {
 
     case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
       stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 031d533..a0d10a2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -25,18 +25,17 @@ import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.OverAggregate
 import org.apache.flink.table.runtime.aggregate._
-import org.apache.flink.types.Row
-
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 class DataStreamOverAggregate(
     logicWindow: Window,
@@ -87,7 +86,7 @@ class DataStreamOverAggregate(
           namedAggregates))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
     if (logicWindow.groups.size > 1) {
       throw new TableException(
         "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
@@ -110,6 +109,8 @@ class DataStreamOverAggregate(
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
@@ -120,6 +121,12 @@ class DataStreamOverAggregate(
       .get(orderKey.getFieldIndex)
       .getValue
 
+    if (consumeRetraction) {
+      throw new TableException(
+        "Retraction on Over window aggregation is not supported yet. " +
+          "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
+    }
+
     timeType match {
       case _: ProcTimeType =>
         // proc-time OVER window
@@ -138,8 +145,7 @@ class DataStreamOverAggregate(
             generator,
             inputDS,
             isRowTimeType = false,
-            isRowsClause = overWindow.isRows
-          )
+            isRowsClause = overWindow.isRows)
         } else {
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
@@ -153,16 +159,14 @@ class DataStreamOverAggregate(
             generator,
             inputDS,
             isRowTimeType = true,
-            isRowsClause = overWindow.isRows
-          )
+            isRowsClause = overWindow.isRows)
         } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
           createBoundedAndCurrentRowOverWindow(
             generator,
             inputDS,
             isRowTimeType = true,
-            isRowsClause = overWindow.isRows
-            )
+            isRowsClause = overWindow.isRows)
         } else {
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
@@ -177,16 +181,16 @@ class DataStreamOverAggregate(
 
   def createUnboundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
-    inputDS: DataStream[Row],
+    inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
-    isRowsClause: Boolean): DataStream[Row] = {
+    isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
 
     // get the output types
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+    val returnTypeInfo = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
       generator,
@@ -196,30 +200,28 @@ class DataStreamOverAggregate(
       partitionKeys.nonEmpty,
       isRowsClause)
 
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(rowTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
+          .asInstanceOf[DataStream[CRow]]
       }
       // non-partitioned aggregation
       else {
         if (isRowTimeType) {
-          inputDS.keyBy(new NullByteKeySelector[Row])
+          inputDS.keyBy(new NullByteKeySelector[CRow])
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(rowTypeInfo)
+            .returns(returnTypeInfo)
             .name(aggOpName)
-            .asInstanceOf[DataStream[Row]]
         } else {
           inputDS
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(rowTypeInfo)
+            .returns(returnTypeInfo)
             .name(aggOpName)
-            .asInstanceOf[DataStream[Row]]
         }
       }
     result
@@ -227,9 +229,9 @@ class DataStreamOverAggregate(
 
   def createBoundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
-    inputDS: DataStream[Row],
+    inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
-    isRowsClause: Boolean): DataStream[Row] = {
+    isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -239,9 +241,9 @@ class DataStreamOverAggregate(
       getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0)
 
     // get the output types
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+    val returnTypeInfo = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
-    val processFunction = AggregateUtil.createBoundedOverProcessFunction(
+    val processFunction = AggregateUtil.createBoundedOverProcessFunction[CRow](
       generator,
       namedAggregates,
       inputType,
@@ -249,24 +251,22 @@ class DataStreamOverAggregate(
       isRowsClause,
       isRowTimeType
     )
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(rowTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
       }
       // non-partitioned aggregation
       else {
         inputDS
-          .keyBy(new NullByteKeySelector[Row])
+          .keyBy(new NullByteKeySelector[CRow])
           .process(processFunction).setParallelism(1).setMaxParallelism(1)
-          .returns(rowTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 50d1d06..5c009a1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.nodes.FlinkRelNode
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.CRow
 
 trait DataStreamRel extends FlinkRelNode {
 
@@ -29,9 +29,9 @@ trait DataStreamRel extends FlinkRelNode {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
-    * @return DataStream of type [[Row]]
+    * @return DataStream of type [[CRow]]
     */
-  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
+  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow]
 
   /**
     * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c187ae8..1a43edb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 
 /**
@@ -53,7 +54,7 @@ class DataStreamScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     convertToInternalRow(inputDataStream, dataStreamTable, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index f340ac7..68e4e6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -60,7 +61,7 @@ class DataStreamUnion(
     s"Union All(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
 
     val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 0ab4a48..5e8d127 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.io.{CRowValuesInputFormat}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 import scala.collection.JavaConverters._
 
@@ -57,11 +57,11 @@ class DataStreamValues(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val returnType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val generator = new CodeGenerator(config)
 
@@ -77,12 +77,12 @@ class DataStreamValues(
     val generatedFunction = generator.generateValuesInputFormat(
       ruleDescription,
       generatedRecords.map(_.code),
-      returnType)
+      returnType.rowType)
 
-    val inputFormat = new ValuesInputFormat[Row](
+    val inputFormat = new CRowValuesInputFormat(
       generatedFunction.name,
       generatedFunction.code,
-      generatedFunction.returnType)
+      returnType)
 
     tableEnv.execEnv.createInput(inputFormat, returnType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 6d08302..02716cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -23,41 +23,46 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowOutputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait StreamScan extends CommonScan with DataStreamRel {
+trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
   protected def convertToInternalRow(
       input: DataStream[Any],
       flinkTable: FlinkTable[_],
       config: TableConfig)
-    : DataStream[Row] = {
+    : DataStream[CRow] = {
 
     val inputType = input.getType
-
-    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val internalType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     // conversion
     if (needsConversion(inputType, internalType)) {
 
-      val mapFunc = getConversionMapper(
+      val function = generateConversionFunction(
         config,
         inputType,
-        internalType,
+        internalType.rowType,
         "DataStreamSourceConversion",
         getRowType.getFieldNames,
         Some(flinkTable.fieldIndexes))
 
+      val mapFunc = new CRowOutputMapRunner(
+        function.name,
+        function.code,
+        internalType)
+
       val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-      input.map(mapFunc).name(opName)
+      input.map(mapFunc).name(opName).returns(internalType)
     }
     // no conversion necessary, forward
     else {
-      input.asInstanceOf[DataStream[Row]]
+      input.asInstanceOf[DataStream[CRow]]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 0a466a3..44ad706 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
-import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
 class StreamTableSourceScan(
@@ -64,7 +64,7 @@ class StreamTableSourceScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
     val config = tableEnv.getConfig
     val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
     convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
index ec90392..0ca079e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -34,7 +34,7 @@ class FlinkLogicalCalc(
     calcProgram: RexProgram)
   extends Calc(cluster, traitSet, input, calcProgram)
   with FlinkLogicalRel
-  with CommonCalc {
+  with CommonCalc[Any] {
 
   override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
     new FlinkLogicalCalc(cluster, traitSet, child, program)

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index aeb67b6..bd9a7ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -82,6 +82,14 @@ object DataStreamRetractionRules {
   }
 
   /**
+    * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
+    */
+  def isAccRetract(node: RelNode): Boolean = {
+    val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
+    null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
+  }
+
+  /**
     * Rule that assigns the default retraction information to [[DataStreamRel]] nodes.
     * The default is to not publish updates as retraction messages and [[AccMode.Acc]].
     */
@@ -190,14 +198,6 @@ object DataStreamRetractionRules {
     }
 
     /**
-      * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
-      */
-    def isAccRetract(node: RelNode): Boolean = {
-      val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
-      null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
-    }
-
-    /**
       * Set [[AccMode.AccRetract]] to a [[RelNode]].
       */
     def setAccRetract(relNode: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
new file mode 100644
index 0000000..66e51b1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowCorrelateFlatMapRunner(
+    flatMapName: String,
+    flatMapCode: String,
+    collectorName: String,
+    collectorCode: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichFlatMapFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[Any] {
+
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[Row, Row] = _
+  private var collector: TableFunctionCollector[_] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
+    LOG.debug("Instantiating TableFunctionCollector.")
+    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
+    this.cRowWrapper = new CRowWrappingCollector()
+
+    LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n Code:\n$flatMapCode")
+    val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, flatMapName, flatMapCode)
+    val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]])
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]]
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+  }
+
+  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+
+    collector.setCollector(cRowWrapper)
+    collector.setInput(in.row)
+    collector.reset()
+
+    function.flatMap(in.row, cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
new file mode 100644
index 0000000..9a4650b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * FlatMapRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowFlatMapRunner(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichFlatMapFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[FlatMapFunction[Row, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[Row, Row] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = clazz.newInstance()
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+
+    this.cRowWrapper = new CRowWrappingCollector()
+  }
+
+  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+    function.flatMap(in.row, cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
new file mode 100644
index 0000000..8e95c93
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+  * MapRunner with [[CRow]] input.
+  */
+class CRowInputMapRunner[OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[CRow, OUT]
+  with ResultTypeQueryable[OUT]
+  with Compiler[MapFunction[Row, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, OUT] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: CRow): OUT = {
+    function.map(in.row)
+  }
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
new file mode 100644
index 0000000..966dea9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+  * MapRunner with [[CRow]] output.
+  */
+class CRowOutputMapRunner(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichMapFunction[Any, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[MapFunction[Any, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Any, Row] = _
+  private var outCRow: CRow = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+    outCRow = new CRow(null, true)
+  }
+
+  override def map(in: Any): CRow = {
+    outCRow.row = function.map(in)
+    outCRow
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}