You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:45 UTC

[11/15] flink git commit: [FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
new file mode 100644
index 0000000..8e95c93
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+  * MapRunner with [[CRow]] input.
+  */
+class CRowInputMapRunner[OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[CRow, OUT]
+  with ResultTypeQueryable[OUT]
+  with Compiler[MapFunction[Row, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, OUT] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: CRow): OUT = {
+    function.map(in.row)
+  }
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
new file mode 100644
index 0000000..966dea9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+  * MapRunner with [[CRow]] output.
+  */
+class CRowOutputMapRunner(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichMapFunction[Any, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[MapFunction[Any, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Any, Row] = _
+  private var outCRow: CRow = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+    outCRow = new CRow(null, true)
+  }
+
+  override def map(in: Any): CRow = {
+    outCRow.row = function.map(in)
+    outCRow
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
new file mode 100644
index 0000000..b2b062e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * The collector is used to wrap a [[Row]] to a [[CRow]]
+  */
+class CRowWrappingCollector() extends Collector[Row] {
+
+  var out: Collector[CRow] = _
+  val outCRow: CRow = new CRow()
+
+  def setChange(change: Boolean): Unit = this.outCRow.change = change
+
+  override def collect(record: Row): Unit = {
+    outCRow.row = record
+    out.collect(outCRow)
+  }
+
+  override def close(): Unit = out.close()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
index b446306..2e37baf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -24,20 +24,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 import org.slf4j.LoggerFactory
 
-class FlatMapRunner[IN, OUT](
+class FlatMapRunner(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends RichFlatMapFunction[IN, OUT]
-  with ResultTypeQueryable[OUT]
-  with Compiler[FlatMapFunction[IN, OUT]] {
+    @transient returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row]
+  with Compiler[FlatMapFunction[Row, Row]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  private var function: FlatMapFunction[IN, OUT] = _
+  private var function: FlatMapFunction[Row, Row] = _
 
   override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
@@ -48,10 +49,10 @@ class FlatMapRunner[IN, OUT](
     FunctionUtils.openFunction(function, parameters)
   }
 
-  override def flatMap(in: IN, out: Collector[OUT]): Unit =
+  override def flatMap(in: Row, out: Collector[Row]): Unit =
     function.flatMap(in, out)
 
-  override def getProducedType: TypeInformation[OUT] = returnType
+  override def getProducedType: TypeInformation[Row] = returnType
 
   override def close(): Unit = {
     FunctionUtils.closeFunction(function)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 377e0ff..dd9c015 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.AggregateFunction
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.slf4j.LoggerFactory
 
@@ -30,28 +31,28 @@ import org.slf4j.LoggerFactory
   * @param genAggregations Generated aggregate helper function
   */
 class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
-  extends AggregateFunction[Row, Row, Row] with Compiler[GeneratedAggregations] {
+  extends AggregateFunction[CRow, Row, Row] with Compiler[GeneratedAggregations] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def createAccumulator(): Row = {
     if (function == null) {
-      initFunction
+      initFunction()
     }
     function.createAccumulators()
   }
 
-  override def add(value: Row, accumulatorRow: Row): Unit = {
+  override def add(value: CRow, accumulatorRow: Row): Unit = {
     if (function == null) {
-      initFunction
+      initFunction()
     }
-    function.accumulate(accumulatorRow, value)
+    function.accumulate(accumulatorRow, value.row)
   }
 
   override def getResult(accumulatorRow: Row): Row = {
     if (function == null) {
-      initFunction
+      initFunction()
     }
     val output = function.createOutputRow()
     function.setAggregationResults(accumulatorRow, output)
@@ -60,7 +61,7 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
 
   override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = {
     if (function == null) {
-      initFunction
+      initFunction()
     }
     function.mergeAccumulatorsPair(aAccumulatorRow, bAccumulatorRow)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 5e9efd0..768c9cb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -44,6 +44,7 @@ import org.apache.flink.table.functions.utils.AggSqlFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction}
 import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 import org.apache.flink.types.Row
@@ -79,7 +80,7 @@ object AggregateUtil {
       isRowTimeType: Boolean,
       isPartitioned: Boolean,
       isRowsClause: Boolean)
-    : ProcessFunction[Row, Row] = {
+    : ProcessFunction[CRow, CRow] = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
@@ -116,13 +117,13 @@ object AggregateUtil {
         new RowTimeUnboundedRowsOver(
           genFunction,
           aggregationStateType,
-          inputTypeInfo)
+          CRowTypeInfo(inputTypeInfo))
       } else {
         // RANGE unbounded over process function
         new RowTimeUnboundedRangeOver(
           genFunction,
           aggregationStateType,
-          inputTypeInfo)
+          CRowTypeInfo(inputTypeInfo))
       }
     } else {
       if (isPartitioned) {
@@ -153,13 +154,16 @@ object AggregateUtil {
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputRowType: RelDataType,
       inputFieldTypes: Seq[TypeInformation[_]],
-      groupings: Array[Int]): ProcessFunction[Row, Row] = {
+      groupings: Array[Int],
+      generateRetraction: Boolean,
+      consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
 
     val (aggFields, aggregates) =
       transformToAggregateFunctions(
         namedAggregates.map(_.getKey),
         inputRowType,
-        needRetraction = false)
+        consumeRetraction)
+
     val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
 
     val outputArity = groupings.length + aggregates.length
@@ -178,14 +182,16 @@ object AggregateUtil {
       None,
       None,
       outputArity,
-      needRetract = false,
+      consumeRetraction,
       needMerge = false,
       needReset = false
     )
 
     new GroupAggProcessFunction(
       genFunction,
-      aggregationStateType)
+      aggregationStateType,
+      generateRetraction)
+
   }
 
   /**
@@ -198,7 +204,7 @@ object AggregateUtil {
     * @param inputTypeInfo Physical type information of the row.
     * @param inputFieldTypeInfo Physical type information of the row's fields.
     * @param precedingOffset the preceding offset
-    * @param isRowsClause   It is a tag that indicates whether the OVER clause is ROWS clause
+    * @param isRowsClause    It is a tag that indicates whether the OVER clause is ROWS clause
     * @param isRowTimeType   It is a tag that indicates whether the time type is rowTimeType
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
@@ -211,7 +217,7 @@ object AggregateUtil {
       precedingOffset: Long,
       isRowsClause: Boolean,
       isRowTimeType: Boolean)
-    : ProcessFunction[Row, Row] = {
+    : ProcessFunction[CRow, CRow] = {
 
     val needRetract = true
     val (aggFields, aggregates) =
@@ -221,6 +227,7 @@ object AggregateUtil {
         needRetract)
 
     val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+    val inputRowType = CRowTypeInfo(inputTypeInfo)
 
     val forwardMapping = (0 until inputType.getFieldCount).toArray
     val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -248,14 +255,14 @@ object AggregateUtil {
         new RowTimeBoundedRowsOver(
           genFunction,
           aggregationStateType,
-          inputTypeInfo,
+          inputRowType,
           precedingOffset
         )
       } else {
         new RowTimeBoundedRangeOver(
           genFunction,
           aggregationStateType,
-          inputTypeInfo,
+          inputRowType,
           precedingOffset
         )
       }
@@ -265,13 +272,13 @@ object AggregateUtil {
           genFunction,
           precedingOffset,
           aggregationStateType,
-          inputTypeInfo)
+          inputRowType)
       } else {
         new ProcTimeBoundedRangeOver(
           genFunction,
           precedingOffset,
           aggregationStateType,
-          inputTypeInfo)
+          inputRowType)
       }
     }
   }
@@ -932,7 +939,7 @@ object AggregateUtil {
       window: LogicalWindow,
       finalRowArity: Int,
       properties: Seq[NamedWindowProperty])
-    : AllWindowFunction[Row, Row, DataStreamWindow] = {
+    : AllWindowFunction[Row, CRow, DataStreamWindow] = {
 
     if (isTimeWindow(window)) {
       val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
@@ -940,7 +947,7 @@ object AggregateUtil {
         startPos,
         endPos,
         finalRowArity)
-        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+        .asInstanceOf[AllWindowFunction[Row, CRow, DataStreamWindow]]
     } else {
       new IncrementalAggregateAllWindowFunction(
         finalRowArity)
@@ -955,8 +962,8 @@ object AggregateUtil {
       numGroupingKeys: Int,
       numAggregates: Int,
       finalRowArity: Int,
-      properties: Seq[NamedWindowProperty])
-    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+      properties: Seq[NamedWindowProperty]):
+    WindowFunction[Row, CRow, Tuple, DataStreamWindow] = {
 
     if (isTimeWindow(window)) {
       val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
@@ -966,7 +973,7 @@ object AggregateUtil {
         startPos,
         endPos,
         finalRowArity)
-        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+        .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]]
     } else {
       new IncrementalAggregateWindowFunction(
         numGroupingKeys,
@@ -981,8 +988,9 @@ object AggregateUtil {
       inputType: RelDataType,
       inputFieldTypeInfo: Seq[TypeInformation[_]],
       outputType: RelDataType,
+      groupingKeys: Array[Int],
       needMerge: Boolean)
-    : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = {
+    : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo, RowTypeInfo) = {
 
     val needRetract = false
     val (aggFields, aggregates) =
@@ -1002,7 +1010,7 @@ object AggregateUtil {
       aggFields,
       aggMapping,
       partialResults = false,
-      Array(), // no fields are forwarded
+      groupingKeys,
       None,
       None,
       outputArity,

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index 95699a2..fabf200 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -56,7 +56,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
   extends RichGroupReduceFunction[Row, Row]
     with Compiler[GeneratedAggregations] {
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: RowTimeWindowPropertyCollector = _
   private val intermediateRowWindowStartPos = keysAndAggregatesArity
   private val intermediateRowWindowEndPos = keysAndAggregatesArity + 1
 
@@ -78,7 +78,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+    collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index a221c53..56ed08a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -47,7 +47,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
   extends RichGroupReduceFunction[Row, Row]
     with Compiler[GeneratedAggregations] {
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: RowTimeWindowPropertyCollector = _
   protected val windowStartPos: Int = keysAndAggregatesArity
 
   private var output: Row = _
@@ -68,7 +68,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
+    collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index f4a1fc5..8af2c2e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -46,7 +46,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
   extends RichGroupReduceFunction[Row, Row]
     with Compiler[GeneratedAggregations] {
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: RowTimeWindowPropertyCollector = _
   protected var aggregateBuffer: Row = new Row(keysAndAggregatesArity + 1)
 
   private var output: Row = _
@@ -67,7 +67,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 81c900c..745f24d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.slf4j.LoggerFactory
+import org.apache.flink.table.runtime.types.CRow
 
 /**
   * Aggregate Function used for the groupby (without window) aggregate
@@ -35,14 +36,17 @@ import org.slf4j.LoggerFactory
   */
 class GroupAggProcessFunction(
     private val genAggregations: GeneratedAggregationsFunction,
-    private val aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row]
+    private val aggregationStateType: RowTypeInfo,
+    private val generateRetraction: Boolean)
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
-  private var output: Row = _
+  private var newRow: CRow = _
+  private var prevRow: CRow = _
+  private var firstRow: Boolean = _
   private var state: ValueState[Row] = _
 
   override def open(config: Configuration) {
@@ -54,7 +58,9 @@ class GroupAggProcessFunction(
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
-    output = function.createOutputRow()
+
+    newRow = new CRow(function.createOutputRow(), true)
+    prevRow = new CRow(function.createOutputRow(), false)
 
     val stateDescriptor: ValueStateDescriptor[Row] =
       new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
@@ -62,29 +68,53 @@ class GroupAggProcessFunction(
   }
 
   override def processElement(
-      input: Row,
-      ctx: ProcessFunction[Row, Row]#Context,
-      out: Collector[Row]): Unit = {
+      inputC: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     // get accumulators
     var accumulators = state.value()
     if (null == accumulators) {
+      firstRow = true
       accumulators = function.createAccumulators()
+    } else {
+      firstRow = false
     }
 
     // Set group keys value to the final output
-    function.setForwardedFields(input, output)
+    function.setForwardedFields(input, newRow.row)
+    function.setForwardedFields(input, prevRow.row)
 
-    // accumulate new input row
-    function.accumulate(accumulators, input)
+    // Set previous aggregate result to the prevRow
+    function.setAggregationResults(accumulators, prevRow.row)
 
-    // set aggregation results to output
-    function.setAggregationResults(accumulators, output)
+    // update aggregate result and set to the newRow
+    if (inputC.change) {
+      // accumulate input
+      function.accumulate(accumulators, input)
+      function.setAggregationResults(accumulators, newRow.row)
+    } else {
+      // retract input
+      function.retract(accumulators, input)
+      function.setAggregationResults(accumulators, newRow.row)
+    }
 
     // update accumulators
     state.update(accumulators)
 
-    out.collect(output)
-  }
+    // if previousRow is not null, do retraction process
+    if (generateRetraction && !firstRow) {
+      if (prevRow.row.equals(newRow.row)) {
+        // ignore same newRow
+        return
+      } else {
+        // retract previous row
+        out.collect(prevRow)
+      }
+    }
 
+    out.collect(newRow)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index ec9b654..711cc05 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -22,6 +22,7 @@ import java.lang.Iterable
 import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -39,17 +40,17 @@ class IncrementalAggregateAllTimeWindowFunction(
   extends IncrementalAggregateAllWindowFunction[TimeWindow](
     finalRowArity) {
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: CRowTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
     super.open(parameters)
   }
 
   override def apply(
       window: TimeWindow,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     // set collector and window
     collector.wrappedCollector = out

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index f92be92..c190785 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -23,6 +23,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -32,12 +33,12 @@ import org.apache.flink.util.Collector
   */
 class IncrementalAggregateAllWindowFunction[W <: Window](
     private val finalRowArity: Int)
-  extends RichAllWindowFunction[Row, Row, W] {
+  extends RichAllWindowFunction[Row, CRow, W] {
 
-  private var output: Row = _
+  private var output: CRow = _
 
   override def open(parameters: Configuration): Unit = {
-    output = new Row(finalRowArity)
+    output = new CRow(new Row(finalRowArity), true)
   }
 
   /**
@@ -47,7 +48,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
   override def apply(
       window: W,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     val iterator = records.iterator
 
@@ -55,7 +56,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
       val record = iterator.next()
       var i = 0
       while (i < record.getArity) {
-        output.setField(i, record.getField(i))
+        output.row.setField(i, record.getField(i))
         i += 1
       }
       out.collect(output)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index dccb4f6..809bbfd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -43,10 +44,10 @@ class IncrementalAggregateTimeWindowFunction(
     numAggregates,
     finalRowArity) {
 
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: CRowTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos)
     super.open(parameters)
   }
 
@@ -54,7 +55,7 @@ class IncrementalAggregateTimeWindowFunction(
       key: Tuple,
       window: TimeWindow,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     // set collector and window
     collector.wrappedCollector = out

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 983efb3..7e9d738 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -24,6 +24,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.util.Collector
 
 /**
@@ -37,12 +38,12 @@ class IncrementalAggregateWindowFunction[W <: Window](
     private val numGroupingKey: Int,
     private val numAggregates: Int,
     private val finalRowArity: Int)
-  extends RichWindowFunction[Row, Row, Tuple, W] {
+  extends RichWindowFunction[Row, CRow, Tuple, W] {
 
-  private var output: Row = _
+  private var output: CRow = _
 
   override def open(parameters: Configuration): Unit = {
-    output = new Row(finalRowArity)
+    output = new CRow(new Row(finalRowArity), true)
   }
 
   /**
@@ -53,7 +54,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
       key: Tuple,
       window: W,
       records: Iterable[Row],
-      out: Collector[Row]): Unit = {
+      out: Collector[CRow]): Unit = {
 
     val iterator = records.iterator
 
@@ -62,12 +63,12 @@ class IncrementalAggregateWindowFunction[W <: Window](
 
       var i = 0
       while (i < numGroupingKey) {
-        output.setField(i, key.getField(i))
+        output.row.setField(i, key.getField(i))
         i += 1
       }
       i = 0
       while (i < numAggregates) {
-        output.setField(numGroupingKey + i, record.getField(i))
+        output.row.setField(numGroupingKey + i, record.getField(i))
         i += 1
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index b63eb81..3fb506f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -31,7 +31,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import java.util.{ArrayList, List => JList}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 /**
@@ -47,10 +48,10 @@ class ProcTimeBoundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     precedingTimeBoundary: Long,
     aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+    inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
-  private var output: Row = _
+  private var output: CRow = _
   private var accumulatorState: ValueState[Row] = _
   private var rowMapState: MapState[Long, JList[Row]] = _
 
@@ -66,11 +67,12 @@ class ProcTimeBoundedRangeOver(
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
 
     // We keep the elements received in a MapState indexed based on their ingestion time
     val rowListTypeInfo: TypeInformation[JList[Row]] =
-      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+      new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
+        .asInstanceOf[TypeInformation[JList[Row]]]
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
@@ -82,9 +84,9 @@ class ProcTimeBoundedRangeOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    input: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
 
     val currentTime = ctx.timerService.currentProcessingTime
     // buffer the event incoming event
@@ -97,15 +99,15 @@ class ProcTimeBoundedRangeOver(
       // register timer to process event once the current millisecond passed
       ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
     }
-    rowList.add(input)
+    rowList.add(input.row)
     rowMapState.put(currentTime, rowList)
 
   }
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
 
     // we consider the original timestamp of events that have registered this time trigger 1 ms ago
     val currentTime = timestamp - 1
@@ -166,10 +168,10 @@ class ProcTimeBoundedRangeOver(
       val input = currentElements.get(iElemenets)
 
       // set the fields of the last event to carry on with the aggregates
-      function.setForwardedFields(input, output)
+      function.setForwardedFields(input, output.row)
 
       // add the accumulators values to result
-      function.setAggregationResults(accumulators, output)
+      function.setAggregationResults(accumulators, output.row)
       out.collect(output)
       iElemenets += 1
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index 31cfd73..0c7f44e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import java.util.{List => JList}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 /**
@@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     precedingOffset: Long,
     aggregatesTypeInfo: RowTypeInfo,
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+    inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
   Preconditions.checkArgument(precedingOffset > 0)
 
   private var accumulatorState: ValueState[Row] = _
   private var rowMapState: MapState[Long, JList[Row]] = _
-  private var output: Row = _
+  private var output: CRow = _
   private var counterState: ValueState[Long] = _
   private var smallestTsState: ValueState[Long] = _
 
@@ -73,13 +74,14 @@ class ProcTimeBoundedRowsOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     // We keep the elements received in a Map state keyed
     // by the ingestion time in the operator.
     // we also keep counter of processed elements
     // and timestamp of oldest element
     val rowListTypeInfo: TypeInformation[JList[Row]] =
-      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+      new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
+        .asInstanceOf[TypeInformation[JList[Row]]]
 
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
@@ -100,9 +102,11 @@ class ProcTimeBoundedRowsOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     val currentTime = ctx.timerService.currentProcessingTime
 
@@ -154,11 +158,11 @@ class ProcTimeBoundedRowsOver(
     }
 
     // copy forwarded fields in output row
-    function.setForwardedFields(input, output)
+    function.setForwardedFields(input, output.row)
 
     // accumulate current row and set aggregate in output row
     function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
+    function.setAggregationResults(accumulators, output.row)
 
     // update map state, accumulator state, counter and timestamp
     val currentTimeState = rowMapState.get(currentTime)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
index 75209db..8a23132 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
@@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
 import org.slf4j.LoggerFactory
 
 /**
@@ -37,12 +38,12 @@ import org.slf4j.LoggerFactory
 class ProcTimeUnboundedNonPartitionedOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with CheckpointedFunction
     with Compiler[GeneratedAggregations] {
 
   private var accumulators: Row = _
-  private var output: Row = _
+  private var output: CRow = _
   private var state: ListState[Row] = _
   val LOG = LoggerFactory.getLogger(this.getClass)
 
@@ -58,7 +59,7 @@ class ProcTimeUnboundedNonPartitionedOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     if (null == accumulators) {
       val it = state.get().iterator()
       if (it.hasNext) {
@@ -70,14 +71,16 @@ class ProcTimeUnboundedNonPartitionedOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+      inputC: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
-    function.setForwardedFields(input, output)
+    function.setForwardedFields(input, output.row)
 
     function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
+    function.setAggregationResults(accumulators, output.row)
 
     out.collect(output)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
index 9baa6a3..847c1bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
@@ -24,7 +24,8 @@ import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
 import org.slf4j.LoggerFactory
 
 /**
@@ -36,10 +37,10 @@ import org.slf4j.LoggerFactory
 class ProcTimeUnboundedPartitionedOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
-  private var output: Row = _
+  private var output: CRow = _
   private var state: ValueState[Row] = _
   val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
@@ -54,16 +55,18 @@ class ProcTimeUnboundedPartitionedOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     val stateDescriptor: ValueStateDescriptor[Row] =
       new ValueStateDescriptor[Row]("overState", aggregationStateType)
     state = getRuntimeContext.getState(stateDescriptor)
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     var accumulators = state.value()
 
@@ -71,13 +74,12 @@ class ProcTimeUnboundedPartitionedOver(
       accumulators = function.createAccumulators()
     }
 
-    function.setForwardedFields(input, output)
+    function.setForwardedFields(input, output.row)
 
     function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output)
+    function.setAggregationResults(accumulators, output.row)
 
     state.update(accumulators)
-
     out.collect(output)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index ef97e71..4020d44 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -17,14 +17,15 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
-import java.util.{List => JList, ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, List => JList}
 
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
 import org.slf4j.LoggerFactory
@@ -39,15 +40,15 @@ import org.slf4j.LoggerFactory
  */
 class RowTimeBoundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: TypeInformation[Row],
-    inputRowType: TypeInformation[Row],
+    aggregationStateType: RowTypeInfo,
+    inputRowType: CRowTypeInfo,
     precedingOffset: Long)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
 
-  private var output: Row = _
+  private var output: CRow = _
 
   // the state which keeps the last triggering timestamp
   private var lastTriggeringTsState: ValueState[Long] = _
@@ -74,7 +75,7 @@ class RowTimeBoundedRangeOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
 
     val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
       new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
@@ -86,7 +87,8 @@ class RowTimeBoundedRangeOver(
 
     val keyTypeInformation: TypeInformation[Long] =
       BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
-    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+    val valueTypeInformation: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
 
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]](
@@ -98,9 +100,11 @@ class RowTimeBoundedRangeOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     // triggering timestamp for trigger calculation
     val triggeringTs = ctx.timestamp
@@ -125,8 +129,8 @@ class RowTimeBoundedRangeOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
 
@@ -172,13 +176,13 @@ class RowTimeBoundedRangeOver(
       }
 
       // set aggregate in output row
-      function.setAggregationResults(accumulators, output)
+      function.setAggregationResults(accumulators, output.row)
 
       // copy forwarded fields to output row and emit output row
       dataListIndex = 0
       while (dataListIndex < inputs.size()) {
         aggregatesIndex = 0
-        function.setForwardedFields(inputs.get(dataListIndex), output)
+        function.setForwardedFields(inputs.get(dataListIndex), output.row)
         out.collect(output)
         dataListIndex += 1
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 7169cf7..5ec6ec7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -27,7 +27,8 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 /**
@@ -41,15 +42,15 @@ import org.slf4j.LoggerFactory
 class RowTimeBoundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
-    inputRowType: TypeInformation[Row],
+    inputRowType: CRowTypeInfo,
     precedingOffset: Long)
-  extends ProcessFunction[Row, Row]
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
 
-  private var output: Row = _
+  private var output: CRow = _
 
   // the state which keeps the last triggering timestamp
   private var lastTriggeringTsState: ValueState[Long] = _
@@ -79,7 +80,7 @@ class RowTimeBoundedRowsOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
 
     val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
       new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
@@ -95,7 +96,8 @@ class RowTimeBoundedRowsOver(
 
     val keyTypeInformation: TypeInformation[Long] =
       BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
-    val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType)
+    val valueTypeInformation: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
 
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]](
@@ -107,9 +109,11 @@ class RowTimeBoundedRowsOver(
   }
 
   override def processElement(
-    input: Row,
-    ctx: ProcessFunction[Row, Row]#Context,
-    out: Collector[Row]): Unit = {
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     // triggering timestamp for trigger calculation
     val triggeringTs = ctx.timestamp
@@ -134,8 +138,8 @@ class RowTimeBoundedRowsOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[Row, Row]#OnTimerContext,
-    out: Collector[Row]): Unit = {
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
 
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
@@ -189,7 +193,7 @@ class RowTimeBoundedRowsOver(
         }
 
         // copy forwarded fields to output row
-        function.setForwardedFields(input, output)
+        function.setForwardedFields(input, output.row)
 
         // retract old row from accumulators
         if (null != retractRow) {
@@ -198,7 +202,7 @@ class RowTimeBoundedRowsOver(
 
         // accumulate current row and set aggregate in output row
         function.accumulate(accumulators, input)
-        function.setAggregationResults(accumulators, output)
+        function.setAggregationResults(accumulators, output.row)
         i += 1
 
         out.collect(output)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 525d4d7..3e2a811 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -28,7 +28,8 @@ import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.streaming.api.operators.TimestampedCollector
-import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.slf4j.LoggerFactory
 
 
@@ -42,11 +43,11 @@ import org.slf4j.LoggerFactory
 abstract class RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+    inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
-  protected var output: Row = _
+  protected var output: CRow = _
   // state to hold the accumulators of the aggregations
   private var accumulatorState: ValueState[Row] = _
   // state to hold rows until the next watermark arrives
@@ -67,7 +68,7 @@ abstract class RowTimeUnboundedOver(
     LOG.debug("Instantiating AggregateHelper.")
     function = clazz.newInstance()
 
-    output = function.createOutputRow()
+    output = new CRow(function.createOutputRow(), true)
     sortedTimestamps = new util.LinkedList[Long]()
 
     // initialize accumulator state
@@ -76,7 +77,8 @@ abstract class RowTimeUnboundedOver(
     accumulatorState = getRuntimeContext.getState[Row](accDescriptor)
 
     // initialize row state
-    val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType)
+    val rowListTypeInfo: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType)
     val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
       new MapStateDescriptor[Long, JList[Row]]("rowmapstate",
         BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
@@ -87,15 +89,17 @@ abstract class RowTimeUnboundedOver(
     * Puts an element from the input stream into state if it is not late.
     * Registers a timer for the next watermark.
     *
-    * @param input The input value.
+    * @param inputC The input value.
     * @param ctx   The ctx to register timer or get current time
     * @param out   The collector for returning result values.
     *
     */
   override def processElement(
-     input: Row,
-     ctx:  ProcessFunction[Row, Row]#Context,
-     out: Collector[Row]): Unit = {
+     inputC: CRow,
+     ctx:  ProcessFunction[CRow, CRow]#Context,
+     out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
 
     val timestamp = ctx.timestamp()
     val curWatermark = ctx.timerService().currentWatermark()
@@ -126,11 +130,11 @@ abstract class RowTimeUnboundedOver(
     */
   override def onTimer(
       timestamp: Long,
-      ctx: ProcessFunction[Row, Row]#OnTimerContext,
-      out: Collector[Row]): Unit = {
+      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
 
-    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]])
-    val collector = out.asInstanceOf[TimestampedCollector[Row]]
+    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]])
+    val collector = out.asInstanceOf[TimestampedCollector[CRow]]
 
     val keyIterator = rowMapState.keys.iterator
     if (keyIterator.hasNext) {
@@ -206,7 +210,7 @@ abstract class RowTimeUnboundedOver(
   def processElementsWithSameTimestamp(
     curRowList: JList[Row],
     lastAccumulator: Row,
-    out: Collector[Row]): Unit
+    out: Collector[CRow]): Unit
 
 }
 
@@ -217,7 +221,7 @@ abstract class RowTimeUnboundedOver(
 class RowTimeUnboundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
+    inputType: TypeInformation[CRow])
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
@@ -226,7 +230,7 @@ class RowTimeUnboundedRowsOver(
   override def processElementsWithSameTimestamp(
     curRowList: JList[Row],
     lastAccumulator: Row,
-    out: Collector[Row]): Unit = {
+    out: Collector[CRow]): Unit = {
 
     var i = 0
     while (i < curRowList.size) {
@@ -234,11 +238,11 @@ class RowTimeUnboundedRowsOver(
 
       var j = 0
       // copy forwarded fields to output row
-      function.setForwardedFields(curRow, output)
+      function.setForwardedFields(curRow, output.row)
 
       // update accumulators and copy aggregates to output row
       function.accumulate(lastAccumulator, curRow)
-      function.setAggregationResults(lastAccumulator, output)
+      function.setAggregationResults(lastAccumulator, output.row)
       // emit output row
       out.collect(output)
       i += 1
@@ -255,7 +259,7 @@ class RowTimeUnboundedRowsOver(
 class RowTimeUnboundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
-    inputType: TypeInformation[Row])
+    inputType: TypeInformation[CRow])
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
@@ -264,7 +268,7 @@ class RowTimeUnboundedRangeOver(
   override def processElementsWithSameTimestamp(
     curRowList: JList[Row],
     lastAccumulator: Row,
-    out: Collector[Row]): Unit = {
+    out: Collector[CRow]): Unit = {
 
     var i = 0
     // all same timestamp data should have same aggregation value.
@@ -281,10 +285,10 @@ class RowTimeUnboundedRangeOver(
       val curRow = curRowList.get(i)
 
       // copy forwarded fields to output row
-      function.setForwardedFields(curRow, output)
+      function.setForwardedFields(curRow, output.row)
 
       //copy aggregates to output row
-      function.setAggregationResults(lastAccumulator, output)
+      function.setAggregationResults(lastAccumulator, output.row)
       out.collect(output)
       i += 1
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
index 9502607..0c8ae00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.aggregate
 
 import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
@@ -26,29 +27,48 @@ import org.apache.flink.util.Collector
   * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
   * collector.
   */
-class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
-    extends Collector[Row] {
+abstract class TimeWindowPropertyCollector[T](
+    windowStartOffset: Option[Int],
+    windowEndOffset: Option[Int])
+  extends Collector[T] {
 
-  var wrappedCollector: Collector[Row] = _
+  var wrappedCollector: Collector[T] = _
+  var output: Row = _
   var windowStart:Long = _
   var windowEnd:Long = _
 
-  override def collect(record: Row): Unit = {
+  def getRow(record: T): Row
 
-    val lastFieldPos = record.getArity - 1
+  override def collect(record: T): Unit = {
+
+    output = getRow(record)
+    val lastFieldPos = output.getArity - 1
 
     if (windowStartOffset.isDefined) {
-      record.setField(
+      output.setField(
         lastFieldPos + windowStartOffset.get,
         SqlFunctions.internalToTimestamp(windowStart))
     }
     if (windowEndOffset.isDefined) {
-      record.setField(
+      output.setField(
         lastFieldPos + windowEndOffset.get,
         SqlFunctions.internalToTimestamp(windowEnd))
     }
+
     wrappedCollector.collect(record)
   }
 
   override def close(): Unit = wrappedCollector.close()
 }
+
+class RowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int])
+  extends TimeWindowPropertyCollector[Row](startOffset, endOffset) {
+
+  override def getRow(record: Row): Row = record
+}
+
+class CRowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int])
+  extends TimeWindowPropertyCollector[CRow](startOffset, endOffset) {
+
+  override def getRow(record: CRow): Row = record.row
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
new file mode 100644
index 0000000..ec73fa6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io
+
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.core.io.GenericInputSplit
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+class CRowValuesInputFormat(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends GenericInputFormat[CRow]
+  with NonParallelInput
+  with ResultTypeQueryable[CRow]
+  with Compiler[GenericInputFormat[Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var format: GenericInputFormat[Row] = _
+
+  override def open(split: GenericInputSplit): Unit = {
+    LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating GenericInputFormat.")
+    format = clazz.newInstance()
+  }
+
+  override def reachedEnd(): Boolean = format.reachedEnd()
+
+  override def nextRecord(reuse: CRow): CRow = {
+    reuse.row = format.nextRecord(reuse.row)
+    reuse.change = true
+    reuse
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
index 1a339e6..d536b39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -23,20 +23,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.core.io.GenericInputSplit
+import org.apache.flink.types.Row
 import org.slf4j.LoggerFactory
 
-class ValuesInputFormat[OUT](
+class ValuesInputFormat(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends GenericInputFormat[OUT]
+    @transient returnType: TypeInformation[Row])
+  extends GenericInputFormat[Row]
   with NonParallelInput
-  with ResultTypeQueryable[OUT]
-  with Compiler[GenericInputFormat[OUT]] {
+  with ResultTypeQueryable[Row]
+  with Compiler[GenericInputFormat[Row]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  private var format: GenericInputFormat[OUT] = _
+  private var format: GenericInputFormat[Row] = _
 
   override def open(split: GenericInputSplit): Unit = {
     LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
@@ -47,7 +48,7 @@ class ValuesInputFormat[OUT](
 
   override def reachedEnd(): Boolean = format.reachedEnd()
 
-  override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
+  override def nextRecord(reuse: Row): Row = format.nextRecord(reuse)
 
-  override def getProducedType: TypeInformation[OUT] = returnType
+  override def getProducedType: TypeInformation[Row] = returnType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
new file mode 100644
index 0000000..25ec8c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.types.Row
+
+/**
+  * Wrapper for a [[Row]] to add retraction information.
+  *
+  * If [[change]] is true, the [[CRow]] is an accumulate message, if it is false it is a
+  * retraction message.
+  *
+  * @param row The wrapped [[Row]].
+  * @param change true for an accumulate message, false for a retraction message.
+  */
+class CRow(var row: Row, var change: Boolean) {
+
+  def this() {
+    this(null, true)
+  }
+
+  override def toString: String = s"${if(change) "+" else "-"}$row"
+
+  override def equals(other: scala.Any): Boolean = {
+    val otherCRow = other.asInstanceOf[CRow]
+    row.equals(otherCRow.row) && change == otherCRow.change
+  }
+}
+
+object CRow {
+
+  def apply(): CRow = {
+    new CRow()
+  }
+
+  def apply(row: Row, change: Boolean): CRow = {
+    new CRow(row, change)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
new file mode 100644
index 0000000..d848c65
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
+import org.apache.flink.types.Row
+
+class CRowComparator(val rowComp: TypeComparator[Row]) extends TypeComparator[CRow] {
+
+  override def hash(record: CRow): Int = rowComp.hash(record.row)
+
+  override def setReference(toCompare: CRow): Unit = rowComp.setReference(toCompare.row)
+
+  override def equalToReference(candidate: CRow): Boolean = rowComp.equalToReference(candidate.row)
+
+  override def compareToReference(otherComp: TypeComparator[CRow]): Int = {
+    val otherCRowComp = otherComp.asInstanceOf[CRowComparator]
+    rowComp.compareToReference(otherCRowComp.rowComp)
+  }
+
+  override def compare(first: CRow, second: CRow): Int = {
+    rowComp.compare(first.row, second.row)
+  }
+
+  override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
+    rowComp.compareSerialized(firstSource, secondSource)
+  }
+
+  override def supportsNormalizedKey(): Boolean = rowComp.supportsNormalizedKey()
+
+  override def supportsSerializationWithKeyNormalization(): Boolean =
+    rowComp.supportsSerializationWithKeyNormalization()
+
+  override def getNormalizeKeyLen: Int = rowComp.getNormalizeKeyLen
+
+  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
+    rowComp.isNormalizedKeyPrefixOnly(keyBytes)
+
+  override def putNormalizedKey(
+      record: CRow,
+      target: MemorySegment,
+      offset: Int,
+      numBytes: Int): Unit = rowComp.putNormalizedKey(record.row, target, offset, numBytes)
+
+  override def writeWithKeyNormalization(record: CRow, target: DataOutputView): Unit = {
+    rowComp.writeWithKeyNormalization(record.row, target)
+    target.writeBoolean(record.change)
+  }
+
+  override def readWithKeyDenormalization(reuse: CRow, source: DataInputView): CRow = {
+    val row = rowComp.readWithKeyDenormalization(reuse.row, source)
+    reuse.row = row
+    reuse.change = source.readBoolean()
+    reuse
+  }
+
+  override def invertNormalizedKey(): Boolean = rowComp.invertNormalizedKey()
+
+  override def duplicate(): TypeComparator[CRow] = new CRowComparator(rowComp.duplicate())
+
+  override def extractKeys(record: scala.Any, target: Array[AnyRef], index: Int): Int =
+    rowComp.extractKeys(record.asInstanceOf[CRow].row, target, index)
+
+  override def getFlatComparators: Array[TypeComparator[_]] =
+    rowComp.getFlatComparators
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
new file mode 100644
index 0000000..1f56a98
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.types.Row
+
+class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSerializer[CRow] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[CRow] = new CRowSerializer(rowSerializer.duplicate())
+
+  override def createInstance(): CRow = new CRow(rowSerializer.createInstance(), true)
+
+  override def copy(from: CRow): CRow = new CRow(rowSerializer.copy(from.row), from.change)
+
+  override def copy(from: CRow, reuse: CRow): CRow = {
+    rowSerializer.copy(from.row, reuse.row)
+    reuse.change = from.change
+    reuse
+  }
+
+  override def getLength: Int = -1
+
+  override def serialize(record: CRow, target: DataOutputView): Unit = {
+    rowSerializer.serialize(record.row, target)
+    target.writeBoolean(record.change)
+  }
+
+  override def deserialize(source: DataInputView): CRow = {
+    val row = rowSerializer.deserialize(source)
+    val change = source.readBoolean()
+    new CRow(row, change)
+  }
+
+  override def deserialize(reuse: CRow, source: DataInputView): CRow = {
+    rowSerializer.deserialize(reuse.row, source)
+    reuse.change = source.readBoolean()
+    reuse
+  }
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    rowSerializer.copy(source, target)
+    target.writeBoolean(source.readBoolean())
+  }
+
+  override def canEqual(obj: Any): Boolean = obj.isInstanceOf[CRowSerializer]
+
+  override def equals(obj: Any): Boolean = {
+
+    if (canEqual(obj)) {
+      val other = obj.asInstanceOf[CRowSerializer]
+      rowSerializer.equals(other.rowSerializer)
+    } else {
+      false
+    }
+  }
+
+  override def hashCode: Int = rowSerializer.hashCode() * 13
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
new file mode 100644
index 0000000..456207a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+
+class CRowTypeInfo(val rowType: RowTypeInfo) extends CompositeType[CRow](classOf[CRow]) {
+
+  override def getFieldNames: Array[String] = rowType.getFieldNames
+
+  override def getFieldIndex(fieldName: String): Int = rowType.getFieldIndex(fieldName)
+
+  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] =
+    rowType.getTypeAt(fieldExpression)
+
+  override def getTypeAt[X](pos: Int): TypeInformation[X] =
+    rowType.getTypeAt(pos)
+
+  override def getFlatFields(
+      fieldExpression: String,
+      offset: Int,
+      result: util.List[FlatFieldDescriptor]): Unit =
+    rowType.getFlatFields(fieldExpression, offset, result)
+
+  override def isBasicType: Boolean = rowType.isBasicType
+
+  override def isTupleType: Boolean = rowType.isTupleType
+
+  override def getArity: Int = rowType.getArity
+
+  override def getTotalFields: Int = rowType.getTotalFields
+
+  override def createSerializer(config: ExecutionConfig): TypeSerializer[CRow] =
+    new CRowSerializer(rowType.createSerializer(config))
+
+  // not implemented because we override createComparator
+  override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[CRow] = null
+
+  override def createComparator(
+      logicalKeyFields: Array[Int],
+      orders: Array[Boolean],
+      logicalFieldOffset: Int,
+      config: ExecutionConfig): TypeComparator[CRow] = {
+
+    val rowComparator = rowType.createComparator(
+      logicalKeyFields,
+      orders,
+      logicalFieldOffset,
+      config)
+
+    new CRowComparator(rowComparator)
+  }
+
+  override def equals(obj: scala.Any): Boolean = {
+    if (this.canEqual(obj)) {
+      rowType.equals(obj.asInstanceOf[CRowTypeInfo].rowType)
+    } else {
+      false
+    }
+  }
+
+  override def canEqual(obj: scala.Any): Boolean = obj.isInstanceOf[CRowTypeInfo]
+
+}
+
+object CRowTypeInfo {
+
+  def apply(rowType: TypeInformation[Row]): CRowTypeInfo = {
+    rowType match {
+      case r: RowTypeInfo => new CRowTypeInfo(r)
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index c37ee74..4a2fcdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -25,6 +25,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   * A simple [[TableSink]] to emit data as CSV files.
@@ -133,3 +134,4 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
     builder.mkString
   }
 }
+