You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:46 UTC

[17/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala
new file mode 100644
index 0000000..a614783
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/Aggregate.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+
+/**
+ * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
+ * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
+ * -- In Map phase, use prepare() to transform aggregate field value into intermediate
+ * aggregate value.
+ * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
+ * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
+ * For associative decomposable aggregate functions, they support partial aggregate. To optimize
+ * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
+ * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
+ * into aggregate buffer.
+ *
+ * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
+ * field index in Row, so different aggregate functions could share the same Row as intermediate
+ * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
+ * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
+ * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
+ *
+ * @tparam T Aggregated value type.
+ */
+trait Aggregate[T] extends Serializable {
+
+  /**
+    * Transform the aggregate field value into intermediate aggregate data.
+    *
+    * @param value The value to insert into the intermediate aggregate row.
+    * @param intermediate The intermediate aggregate row into which the value is inserted.
+    */
+  def prepare(value: Any, intermediate: Row): Unit
+
+  /**
+    * Initiate the intermediate aggregate value in Row.
+    *
+    * @param intermediate The intermediate aggregate row to initiate.
+    */
+  def initiate(intermediate: Row): Unit
+
+  /**
+    * Merge intermediate aggregate data into aggregate buffer.
+    *
+    * @param intermediate The intermediate aggregate row to merge.
+    * @param buffer The aggregate buffer into which the intermedidate is merged.
+    */
+  def merge(intermediate: Row, buffer: Row): Unit
+
+  /**
+    * Calculate the final aggregated result based on aggregate buffer.
+    *
+    * @param buffer The aggregate buffer from which the final aggregate is computed.
+    * @return The final result of the aggregate.
+    */
+  def evaluate(buffer: Row): T
+
+  /**
+    * Intermediate aggregate value types.
+    *
+    * @return The types of the intermediate fields of this aggregate.
+    */
+  def intermediateDataType: Array[TypeInformation[_]]
+
+  /**
+    * Set the aggregate data offset in Row.
+    *
+    * @param aggOffset The offset of this aggregate in the intermediate aggregate rows.
+    */
+  def setAggOffsetInRow(aggOffset: Int)
+
+  /**
+    * Whether aggregate function support partial aggregate.
+    *
+    * @return True if the aggregate supports partial aggregation, False otherwise.
+    */
+  def supportPartial: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
new file mode 100644
index 0000000..234ecfb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+class AggregateAllTimeWindowFunction(
+    groupReduceFunction: RichGroupReduceFunction[Row, Row],
+    windowStartPos: Option[Int],
+    windowEndPos: Option[Int])
+  extends AggregateAllWindowFunction[TimeWindow](groupReduceFunction) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
+  }
+
+  override def apply(window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit = {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    // call wrapped reduce function with property collector
+    super.apply(window, input, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllWindowFunction.scala
new file mode 100644
index 0000000..10a06da
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAllWindowFunction.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+class AggregateAllWindowFunction[W <: Window](
+    groupReduceFunction: RichGroupReduceFunction[Row, Row])
+  extends RichAllWindowFunction[Row, Row, W] {
+
+  override def open(parameters: Configuration): Unit = {
+    groupReduceFunction.open(parameters)
+  }
+
+  override def apply(window: W, input: Iterable[Row], out: Collector[Row]): Unit = {
+    groupReduceFunction.reduce(input, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
new file mode 100644
index 0000000..21a96e0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Preconditions
+
+class AggregateMapFunction[IN, OUT](
+    private val aggregates: Array[Aggregate[_]],
+    private val aggFields: Array[Int],
+    private val groupingKeys: Array[Int],
+    @transient private val returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT] {
+  
+  private var output: Row = _
+  
+  override def open(config: Configuration) {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(aggFields)
+    Preconditions.checkArgument(aggregates.size == aggFields.size)
+    val partialRowLength = groupingKeys.length +
+        aggregates.map(_.intermediateDataType.length).sum
+    output = new Row(partialRowLength)
+  }
+
+  override def map(value: IN): OUT = {
+    
+    val input = value.asInstanceOf[Row]
+    for (i <- 0 until aggregates.length) {
+      val fieldValue = input.getField(aggFields(i))
+      aggregates(i).prepare(fieldValue, output)
+    }
+    for (i <- 0 until groupingKeys.length) {
+      output.setField(i, input.getField(groupingKeys(i)))
+    }
+    output.asInstanceOf[OUT]
+  }
+
+  override def getProducedType: TypeInformation[OUT] = {
+    returnType
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
new file mode 100644
index 0000000..31b85cd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+ *                         and output Row.
+ * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+ *                         index in output Row.
+ */
+class AggregateReduceCombineFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val intermediateRowArity: Int,
+    private val finalRowArity: Int)
+  extends AggregateReduceGroupFunction(
+    aggregates,
+    groupKeysMapping,
+    aggregateMapping,
+    intermediateRowArity,
+    finalRowArity)
+  with CombineFunction[Row, Row] {
+
+  /**
+   * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
+   *
+   * @param records  Sub-grouped intermediate aggregate Rows iterator.
+   * @return Combined intermediate aggregate Row.
+   *
+   */
+  override def combine(records: Iterable[Row]): Row = {
+
+    // Initiate intermediate aggregate value.
+    aggregates.foreach(_.initiate(aggregateBuffer))
+
+    // Merge intermediate aggregate value to buffer.
+    var last: Row = null
+    records.foreach((record) => {
+      aggregates.foreach(_.merge(record, aggregateBuffer))
+      last = record
+    })
+
+    // Set group keys to aggregateBuffer.
+    for (i <- groupKeysMapping.indices) {
+      aggregateBuffer.setField(i, last.getField(i))
+    }
+
+    aggregateBuffer
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
new file mode 100644
index 0000000..c1efebb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+/**
+ * It wraps the aggregate logic inside of 
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
+ *                         and output Row.
+ * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+ *                         index in output Row.
+ */
+class AggregateReduceGroupFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val intermediateRowArity: Int,
+    private val finalRowArity: Int)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  protected var aggregateBuffer: Row = _
+  private var output: Row = _
+
+  override def open(config: Configuration) {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(groupKeysMapping)
+    aggregateBuffer = new Row(intermediateRowArity)
+    output = new Row(finalRowArity)
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them into output 
+   * Row based on the mapping relation between intermediate aggregate data and output data.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+    // Initiate intermediate aggregate value.
+    aggregates.foreach(_.initiate(aggregateBuffer))
+
+    // Merge intermediate aggregate value to buffer.
+    var last: Row = null
+    records.foreach((record) => {
+      aggregates.foreach(_.merge(record, aggregateBuffer))
+      last = record
+    })
+
+    // Set group keys value to final output.
+    groupKeysMapping.foreach {
+      case (after, previous) =>
+        output.setField(after, last.getField(previous))
+    }
+
+    // Evaluate final aggregate value and set to output.
+    aggregateMapping.foreach {
+      case (after, previous) =>
+        output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
+    }
+
+    out.collect(output)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateTimeWindowFunction.scala
new file mode 100644
index 0000000..b7419dd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateTimeWindowFunction.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+class AggregateTimeWindowFunction(
+    groupReduceFunction: RichGroupReduceFunction[Row, Row],
+    windowStartPos: Option[Int],
+    windowEndPos: Option[Int])
+  extends AggregateWindowFunction[TimeWindow](groupReduceFunction) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
+  }
+
+  override def apply(
+    key: Tuple,
+    window: TimeWindow,
+    input: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    // call wrapped reduce function with property collector
+    super.apply(key, window, input, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
new file mode 100644
index 0000000..282e6c0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
+import org.apache.calcite.sql.fun._
+import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+object AggregateUtil {
+
+  type CalcitePair[T, R] = org.apache.calcite.util.Pair[T, R]
+  type JavaList[T] = java.util.List[T]
+
+  /**
+    * Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
+    * The function returns intermediate aggregate values of all aggregate function which are
+    * organized by the following format:
+    *
+    * {{{
+    *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
+    *                             |                          |
+    *                             v                          v
+    *        +---------+---------+--------+--------+--------+--------+
+    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+    *        +---------+---------+--------+--------+--------+--------+
+    *                                              ^
+    *                                              |
+    *                               sum(y) aggOffsetInRow = 4
+    * }}}
+    *
+    */
+  private[flink] def createPrepareMapFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    groupings: Array[Int],
+    inputType: RelDataType): MapFunction[Any, Row] = {
+
+    val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),
+      inputType,
+      groupings.length)
+
+    val mapReturnType: RowTypeInfo =
+      createAggregateBufferDataType(groupings, aggregates, inputType)
+
+    val mapFunction = new AggregateMapFunction[Row, Row](
+      aggregates,
+      aggFieldIndexes,
+      groupings,
+      mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+
+    mapFunction
+  }
+
+  /**
+    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute aggregates.
+    * If all aggregates support partial aggregation, the
+    * [[org.apache.flink.api.common.functions.GroupReduceFunction]] implements
+    * [[org.apache.flink.api.common.functions.CombineFunction]] as well.
+    *
+    */
+  private[flink] def createAggregateGroupReduceFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int]): RichGroupReduceFunction[Row, Row] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),
+      inputType,
+      groupings.length)._2
+
+    val (groupingOffsetMapping, aggOffsetMapping) =
+      getGroupingOffsetAndAggOffsetMapping(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    val allPartialAggregate: Boolean = aggregates.forall(_.supportPartial)
+
+    val intermediateRowArity = groupings.length +
+      aggregates.map(_.intermediateDataType.length).sum
+
+    val groupReduceFunction =
+      if (allPartialAggregate) {
+        new AggregateReduceCombineFunction(
+          aggregates,
+          groupingOffsetMapping,
+          aggOffsetMapping,
+          intermediateRowArity,
+          outputType.getFieldCount)
+      }
+      else {
+        new AggregateReduceGroupFunction(
+          aggregates,
+          groupingOffsetMapping,
+          aggOffsetMapping,
+          intermediateRowArity,
+          outputType.getFieldCount)
+      }
+    groupReduceFunction
+  }
+
+  /**
+    * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
+    * aggregation.
+    *
+    */
+  private[flink] def createIncrementalAggregateReduceFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),inputType,groupings.length)._2
+
+    val groupingOffsetMapping =
+      getGroupingOffsetAndAggOffsetMapping(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)._1
+
+    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
+    val reduceFunction = new IncrementalAggregateReduceFunction(
+      aggregates,
+      groupingOffsetMapping,
+      intermediateRowArity)
+    reduceFunction
+  }
+
+  /**
+    * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
+    */
+  private[flink] def createAllWindowAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+    val aggFunction =
+      createAggregateGroupReduceFunction(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+    } else {
+      new AggregateAllWindowFunction(aggFunction)
+    }
+  }
+
+  /**
+    * Create a [[WindowFunction]] to compute partitioned group window aggregates.
+    *
+    */
+  private[flink] def createWindowAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+    val aggFunction =
+      createAggregateGroupReduceFunction(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+    } else {
+      new AggregateWindowFunction(aggFunction)
+    }
+  }
+
+  /**
+    * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
+    * window aggreagtes.
+    */
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),inputType,groupings.length)._2
+
+    val (groupingOffsetMapping, aggOffsetMapping) =
+      getGroupingOffsetAndAggOffsetMapping(
+      namedAggregates,
+      inputType,
+      outputType,
+      groupings)
+
+    val finalRowArity = outputType.getFieldCount
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new IncrementalAggregateAllTimeWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity,
+        startPos,
+        endPos)
+      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+    } else {
+      new IncrementalAggregateAllWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity)
+    }
+  }
+
+  /**
+    * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
+    */
+  private[flink] def createWindowIncrementalAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),inputType,groupings.length)._2
+
+    val (groupingOffsetMapping, aggOffsetMapping) =
+      getGroupingOffsetAndAggOffsetMapping(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    val finalRowArity = outputType.getFieldCount
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new IncrementalAggregateTimeWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity,
+        startPos,
+        endPos)
+      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+    } else {
+      new IncrementalAggregateWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity)
+    }
+  }
+
+  /**
+    * Return true if all aggregates can be partially computed. False otherwise.
+    */
+  private[flink] def doAllSupportPartialAggregation(
+    aggregateCalls: Seq[AggregateCall],
+    inputType: RelDataType,
+    groupKeysCount: Int): Boolean = {
+    transformToAggregateFunctions(
+      aggregateCalls,
+      inputType,
+      groupKeysCount)._2.forall(_.supportPartial)
+  }
+
+  /**
+    * @return groupingOffsetMapping (mapping relation between field index of intermediate
+    *         aggregate Row and output Row.)
+    *         and aggOffsetMapping (the mapping relation between aggregate function index in list
+    *         and its corresponding field index in output Row.)
+    */
+  private def getGroupingOffsetAndAggOffsetMapping(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+    // the mapping relation between field index of intermediate aggregate Row and output Row.
+    val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
+
+    // the mapping relation between aggregate function index in list and its corresponding
+    // field index in output Row.
+    val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
+
+    if (groupingOffsetMapping.length != groupings.length ||
+      aggOffsetMapping.length != namedAggregates.length) {
+      throw new TableException(
+        "Could not find output field in input data type " +
+          "or aggregate functions.")
+    }
+    (groupingOffsetMapping, aggOffsetMapping)
+  }
+
+  private def isTimeWindow(window: LogicalWindow) = {
+    window match {
+      case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
+      case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
+      case ProcessingTimeSessionGroupWindow(_, _) => true
+      case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
+      case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
+      case EventTimeSessionGroupWindow(_, _, _) => true
+    }
+  }
+
+  private def computeWindowStartEndPropertyPos(
+    properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = {
+
+    val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
+      (p, x) => p match {
+        case NamedWindowProperty(name, prop) =>
+          prop match {
+            case WindowStart(_) if x._1.isDefined =>
+              throw new TableException("Duplicate WindowStart property encountered. This is a bug.")
+            case WindowStart(_) =>
+              (Some(x._3), x._2, x._3 - 1)
+            case WindowEnd(_) if x._2.isDefined =>
+              throw new TableException("Duplicate WindowEnd property encountered. This is a bug.")
+            case WindowEnd(_) =>
+              (x._1, Some(x._3), x._3 - 1)
+          }
+      }
+    }
+    (propPos._1, propPos._2)
+  }
+
+  private def transformToAggregateFunctions(
+    aggregateCalls: Seq[AggregateCall],
+    inputType: RelDataType,
+    groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
+
+    // store the aggregate fields of each aggregate function, by the same order of aggregates.
+    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
+    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
+
+    // set the start offset of aggregate buffer value to group keys' length, 
+    // as all the group keys would be moved to the start fields of intermediate
+    // aggregate data.
+    var aggOffset = groupKeysCount
+
+    // create aggregate function instances by function type and aggregate field data type.
+    aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
+      val argList: util.List[Integer] = aggregateCall.getArgList
+      if (argList.isEmpty) {
+        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+          aggFieldIndexes(index) = 0
+        } else {
+          throw new TableException("Aggregate fields should not be empty.")
+        }
+      } else {
+        if (argList.size() > 1) {
+          throw new TableException("Currently, do not support aggregate on multi fields.")
+        }
+        aggFieldIndexes(index) = argList.get(0)
+      }
+      val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
+      aggregateCall.getAggregation match {
+        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
+          aggregates(index) = sqlTypeName match {
+            case TINYINT =>
+              new ByteSumAggregate
+            case SMALLINT =>
+              new ShortSumAggregate
+            case INTEGER =>
+              new IntSumAggregate
+            case BIGINT =>
+              new LongSumAggregate
+            case FLOAT =>
+              new FloatSumAggregate
+            case DOUBLE =>
+              new DoubleSumAggregate
+            case DECIMAL =>
+              new DecimalSumAggregate
+            case sqlType: SqlTypeName =>
+              throw new TableException("Sum aggregate does no support type:" + sqlType)
+          }
+        }
+        case _: SqlAvgAggFunction => {
+          aggregates(index) = sqlTypeName match {
+            case TINYINT =>
+               new ByteAvgAggregate
+            case SMALLINT =>
+              new ShortAvgAggregate
+            case INTEGER =>
+              new IntAvgAggregate
+            case BIGINT =>
+              new LongAvgAggregate
+            case FLOAT =>
+              new FloatAvgAggregate
+            case DOUBLE =>
+              new DoubleAvgAggregate
+            case DECIMAL =>
+              new DecimalAvgAggregate
+            case sqlType: SqlTypeName =>
+              throw new TableException("Avg aggregate does no support type:" + sqlType)
+          }
+        }
+        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
+          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
+            sqlTypeName match {
+              case TINYINT =>
+                new ByteMinAggregate
+              case SMALLINT =>
+                new ShortMinAggregate
+              case INTEGER =>
+                new IntMinAggregate
+              case BIGINT =>
+                new LongMinAggregate
+              case FLOAT =>
+                new FloatMinAggregate
+              case DOUBLE =>
+                new DoubleMinAggregate
+              case DECIMAL =>
+                new DecimalMinAggregate
+              case BOOLEAN =>
+                new BooleanMinAggregate
+              case sqlType: SqlTypeName =>
+                throw new TableException("Min aggregate does no support type:" + sqlType)
+            }
+          } else {
+            sqlTypeName match {
+              case TINYINT =>
+                new ByteMaxAggregate
+              case SMALLINT =>
+                new ShortMaxAggregate
+              case INTEGER =>
+                new IntMaxAggregate
+              case BIGINT =>
+                new LongMaxAggregate
+              case FLOAT =>
+                new FloatMaxAggregate
+              case DOUBLE =>
+                new DoubleMaxAggregate
+              case DECIMAL =>
+                new DecimalMaxAggregate
+              case BOOLEAN =>
+                new BooleanMaxAggregate
+              case sqlType: SqlTypeName =>
+                throw new TableException("Max aggregate does no support type:" + sqlType)
+            }
+          }
+        }
+        case _: SqlCountAggFunction =>
+          aggregates(index) = new CountAggregate
+        case unSupported: SqlAggFunction =>
+          throw new TableException("unsupported Function: " + unSupported.getName)
+      }
+      setAggregateDataOffset(index)
+    }
+
+    // set the aggregate intermediate data start index in Row, and update current value.
+    def setAggregateDataOffset(index: Int): Unit = {
+      aggregates(index).setAggOffsetInRow(aggOffset)
+      aggOffset += aggregates(index).intermediateDataType.length
+    }
+
+    (aggFieldIndexes, aggregates)
+  }
+
+  private def createAggregateBufferDataType(
+    groupings: Array[Int],
+    aggregates: Array[Aggregate[_]],
+    inputType: RelDataType): RowTypeInfo = {
+
+    // get the field data types of group keys.
+    val groupingTypes: Seq[TypeInformation[_]] = groupings
+      .map(inputType.getFieldList.get(_).getType)
+      .map(FlinkTypeFactory.toTypeInfo)
+
+    val aggPartialNameSuffix = "agg_buffer_"
+    val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+
+    // get all field data types of all intermediate aggregates
+    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
+
+    // concat group key types and aggregation types
+    val allFieldTypes = groupingTypes ++: aggTypes
+    val partialType = new RowTypeInfo(allFieldTypes: _*)
+    partialType
+  }
+
+  // Find the mapping between the index of aggregate list and aggregated value index in output Row.
+  private def getAggregateMapping(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    outputType: RelDataType): Array[(Int, Int)] = {
+
+    // the mapping relation between aggregate function index in list and its corresponding
+    // field index in output Row.
+    var aggOffsetMapping = ArrayBuffer[(Int, Int)]()
+
+    outputType.getFieldList.zipWithIndex.foreach{
+      case (outputFieldType, outputIndex) =>
+        namedAggregates.zipWithIndex.foreach {
+          case (namedAggCall, aggregateIndex) =>
+            if (namedAggCall.getValue.equals(outputFieldType.getName) &&
+                namedAggCall.getKey.getType.equals(outputFieldType.getType)) {
+              aggOffsetMapping += ((outputIndex, aggregateIndex))
+            }
+        }
+    }
+   
+    aggOffsetMapping.toArray
+  }
+
+  // Find the mapping between the index of group key in intermediate aggregate Row and its index
+  // in output Row.
+  private def getGroupKeysMapping(
+    inputDatType: RelDataType,
+    outputType: RelDataType,
+    groupKeys: Array[Int]): Array[(Int, Int)] = {
+
+    // the mapping relation between field index of intermediate aggregate Row and output Row.
+    var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()
+
+    outputType.getFieldList.zipWithIndex.foreach {
+      case (outputFieldType, outputIndex) =>
+        inputDatType.getFieldList.zipWithIndex.foreach {
+          // find the field index in input data type.
+          case (inputFieldType, inputIndex) =>
+            if (outputFieldType.getName.equals(inputFieldType.getName) &&
+                outputFieldType.getType.equals(inputFieldType.getType)) {
+              // as aggregated field in output data type would not have a matched field in
+              // input data, so if inputIndex is not -1, it must be a group key. Then we can
+              // find the field index in buffer data by the group keys index mapping between
+              // input data and buffer data.
+              for (i <- groupKeys.indices) {
+                if (inputIndex == groupKeys(i)) {
+                  groupingOffsetMapping += ((outputIndex, i))
+                }
+              }
+            }
+        }
+    }
+
+    groupingOffsetMapping.toArray
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateWindowFunction.scala
new file mode 100644
index 0000000..5491b1d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateWindowFunction.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+class AggregateWindowFunction[W <: Window](groupReduceFunction: RichGroupReduceFunction[Row, Row])
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+  override def open(parameters: Configuration): Unit = {
+    groupReduceFunction.open(parameters)
+  }
+
+  override def apply(
+    key: Tuple,
+    window: W,
+    input: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    groupReduceFunction.reduce(input, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala
new file mode 100644
index 0000000..cb94ca1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AvgAggregate.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import com.google.common.math.LongMath
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.types.Row
+import java.math.BigDecimal
+import java.math.BigInteger
+
+abstract class AvgAggregate[T] extends Aggregate[T] {
+  protected var partialSumIndex: Int = _
+  protected var partialCountIndex: Int = _
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggOffset: Int): Unit = {
+    partialSumIndex = aggOffset
+    partialCountIndex = aggOffset + 1
+  }
+}
+
+abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
+
+  override def initiate(partial: Row): Unit = {
+    partial.setField(partialSumIndex, 0L)
+    partial.setField(partialCountIndex, 0L)
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      partial.setField(partialSumIndex, 0L)
+      partial.setField(partialCountIndex, 0L)
+    } else {
+      doPrepare(value, partial)
+    }
+  }
+
+  override def merge(partial: Row, buffer: Row): Unit = {
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum))
+    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
+  }
+
+  override def evaluate(buffer : Row): T = {
+    doEvaluate(buffer).asInstanceOf[T]
+  }
+
+  override def intermediateDataType = Array(
+    BasicTypeInfo.LONG_TYPE_INFO,
+    BasicTypeInfo.LONG_TYPE_INFO)
+
+  def doPrepare(value: Any, partial: Row): Unit
+
+  def doEvaluate(buffer: Row): Any
+}
+
+class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
+  override def doPrepare(value: Any, partial: Row): Unit = {
+    val input = value.asInstanceOf[Byte]
+    partial.setField(partialSumIndex, input.toLong)
+    partial.setField(partialCountIndex, 1L)
+  }
+
+  override def doEvaluate(buffer: Row): Any = {
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toByte
+    }
+  }
+}
+
+class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
+
+  override def doPrepare(value: Any, partial: Row): Unit = {
+    val input = value.asInstanceOf[Short]
+    partial.setField(partialSumIndex, input.toLong)
+    partial.setField(partialCountIndex, 1L)
+  }
+
+  override def doEvaluate(buffer: Row): Any = {
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toShort
+    }
+  }
+}
+
+class IntAvgAggregate extends IntegralAvgAggregate[Int] {
+
+  override def doPrepare(value: Any, partial: Row): Unit = {
+    val input = value.asInstanceOf[Int]
+    partial.setField(partialSumIndex, input.toLong)
+    partial.setField(partialCountIndex, 1L)
+  }
+
+  override def doEvaluate(buffer: Row): Any = {
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toInt
+    }
+  }
+}
+
+class LongAvgAggregate extends IntegralAvgAggregate[Long] {
+
+  override def intermediateDataType = Array(
+    BasicTypeInfo.BIG_INT_TYPE_INFO,
+    BasicTypeInfo.LONG_TYPE_INFO)
+
+  override def initiate(partial: Row): Unit = {
+    partial.setField(partialSumIndex, BigInteger.ZERO)
+    partial.setField(partialCountIndex, 0L)
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      partial.setField(partialSumIndex, BigInteger.ZERO)
+      partial.setField(partialCountIndex, 0L)
+    } else {
+      doPrepare(value, partial)
+    }
+  }
+
+  override def doPrepare(value: Any, partial: Row): Unit = {
+    val input = value.asInstanceOf[Long]
+    partial.setField(partialSumIndex, BigInteger.valueOf(input))
+    partial.setField(partialCountIndex, 1L)
+  }
+
+  override def merge(partial: Row, buffer: Row): Unit = {
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    buffer.setField(partialSumIndex, partialSum.add(bufferSum))
+    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
+  }
+
+  override def doEvaluate(buffer: Row): Any = {
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    if (bufferCount == 0L) {
+      null
+    } else {
+      bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
+    }
+  }
+}
+
+abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
+
+  override def initiate(partial: Row): Unit = {
+    partial.setField(partialSumIndex, 0D)
+    partial.setField(partialCountIndex, 0L)
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      partial.setField(partialSumIndex, 0D)
+      partial.setField(partialCountIndex, 0L)
+    } else {
+      doPrepare(value, partial)
+    }
+  }
+
+  override def merge(partial: Row, buffer: Row): Unit = {
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+
+    buffer.setField(partialSumIndex, partialSum + bufferSum)
+    buffer.setField(partialCountIndex, partialCount + bufferCount)
+  }
+
+  override def evaluate(buffer : Row): T = {
+    doEvaluate(buffer).asInstanceOf[T]
+  }
+
+  override def intermediateDataType = Array(
+    BasicTypeInfo.DOUBLE_TYPE_INFO,
+    BasicTypeInfo.LONG_TYPE_INFO)
+
+  def doPrepare(value: Any, partial: Row): Unit
+
+  def doEvaluate(buffer: Row): Any
+}
+
+class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
+
+  override def doPrepare(value: Any, partial: Row): Unit = {
+    val input = value.asInstanceOf[Float]
+    partial.setField(partialSumIndex, input.toDouble)
+    partial.setField(partialCountIndex, 1L)
+  }
+
+
+  override def doEvaluate(buffer: Row): Any = {
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toFloat
+    }
+  }
+}
+
+class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
+
+  override def doPrepare(value: Any, partial: Row): Unit = {
+    val input = value.asInstanceOf[Double]
+    partial.setField(partialSumIndex, input)
+    partial.setField(partialCountIndex, 1L)
+  }
+
+  override def doEvaluate(buffer: Row): Any = {
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount)
+    }
+  }
+}
+
+class DecimalAvgAggregate extends AvgAggregate[BigDecimal] {
+
+  override def intermediateDataType = Array(
+    BasicTypeInfo.BIG_DEC_TYPE_INFO,
+    BasicTypeInfo.LONG_TYPE_INFO)
+
+  override def initiate(partial: Row): Unit = {
+    partial.setField(partialSumIndex, BigDecimal.ZERO)
+    partial.setField(partialCountIndex, 0L)
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      initiate(partial)
+    } else {
+      val input = value.asInstanceOf[BigDecimal]
+      partial.setField(partialSumIndex, input)
+      partial.setField(partialCountIndex, 1L)
+    }
+  }
+
+  override def merge(partial: Row, buffer: Row): Unit = {
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    buffer.setField(partialSumIndex, partialSum.add(bufferSum))
+    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
+  }
+
+  override def evaluate(buffer: Row): BigDecimal = {
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
+    if (bufferCount != 0) {
+      val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
+      bufferSum.divide(BigDecimal.valueOf(bufferCount))
+    } else {
+      null.asInstanceOf[BigDecimal]
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala
new file mode 100644
index 0000000..ea8e1d8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CountAggregate.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.types.Row
+
+class CountAggregate extends Aggregate[Long] {
+  private var countIndex: Int = _
+
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(countIndex, 0L)
+  }
+
+  override def merge(intermediate: Row, buffer: Row): Unit = {
+    val partialCount = intermediate.getField(countIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(countIndex).asInstanceOf[Long]
+    buffer.setField(countIndex, partialCount + bufferCount)
+  }
+
+  override def evaluate(buffer: Row): Long = {
+    buffer.getField(countIndex).asInstanceOf[Long]
+  }
+
+  override def prepare(value: Any, intermediate: Row): Unit = {
+    if (value == null) {
+      intermediate.setField(countIndex, 0L)
+    } else {
+      intermediate.setField(countIndex, 1L)
+    }
+  }
+
+  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggIndex: Int): Unit = {
+    countIndex = aggIndex
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
new file mode 100644
index 0000000..5d7a94b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+/**
+  *
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateAllTimeWindowFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int,
+    private val windowStartPos: Option[Int],
+    private val windowEndPos: Option[Int])
+  extends IncrementalAggregateAllWindowFunction[TimeWindow](
+    aggregates,
+    groupKeysMapping,
+    aggregateMapping,
+    finalRowArity) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
+  }
+
+  override def apply(
+    window: TimeWindow,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    super.apply(window, records, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
new file mode 100644
index 0000000..3c41a62
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateAllWindowFunction[W <: Window](
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int)
+  extends RichAllWindowFunction[Row, Row, W] {
+
+  private var output: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(groupKeysMapping)
+    output = new Row(finalRowArity)
+  }
+
+  /**
+    * Calculate aggregated values output by aggregate buffer, and set them into output
+    * Row based on the mapping relation between intermediate aggregate data and output data.
+    */
+  override def apply(
+    window: W,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    val iterator = records.iterator
+
+    if (iterator.hasNext) {
+      val record = iterator.next()
+      // Set group keys value to final output.
+      groupKeysMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, record.getField(previous))
+      }
+      // Evaluate final aggregate value and set to output.
+      aggregateMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, aggregates(previous).evaluate(record))
+      }
+      out.collect(output)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
new file mode 100644
index 0000000..14b44e8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * Incrementally computes group window aggregates.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+    private val aggregates: Array[Aggregate[_]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val intermediateRowArity: Int)
+  extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+
+  /**
+    * For Incremental intermediate aggregate Rows, merge value1 and value2
+    * into aggregate buffer, return aggregate buffer.
+    *
+    * @param value1 The first value to combined.
+    * @param value2 The second value to combined.
+    * @return  accumulatorRow A resulting row that combines two input values.
+    *
+    */
+  override def reduce(value1: Row, value2: Row): Row = {
+
+    // TODO: once FLINK-5105 is solved, we can avoid creating a new row for each invocation
+    //   and directly merge value1 and value2.
+    val accumulatorRow = new Row(intermediateRowArity)
+
+    // copy all fields of value1 into accumulatorRow
+    (0 until intermediateRowArity)
+    .foreach(i => accumulatorRow.setField(i, value1.getField(i)))
+    // merge value2 to accumulatorRow
+    aggregates.foreach(_.merge(value2, accumulatorRow))
+
+    accumulatorRow
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
new file mode 100644
index 0000000..a96ce7a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateTimeWindowFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int,
+    private val windowStartPos: Option[Int],
+    private val windowEndPos: Option[Int])
+  extends IncrementalAggregateWindowFunction[TimeWindow](
+    aggregates,
+    groupKeysMapping,
+    aggregateMapping, finalRowArity) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
+  }
+
+  override def apply(
+    key: Tuple,
+    window: TimeWindow,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    super.apply(key, window, records, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
new file mode 100644
index 0000000..30f7a7b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateWindowFunction[W <: Window](
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+  private var output: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(groupKeysMapping)
+    output = new Row(finalRowArity)
+  }
+
+  /**
+    * Calculate aggregated values output by aggregate buffer, and set them into output
+    * Row based on the mapping relation between intermediate aggregate data and output data.
+    */
+  override def apply(
+    key: Tuple,
+    window: W,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    val iterator = records.iterator
+
+    if (iterator.hasNext) {
+      val record = iterator.next()
+      // Set group keys value to final output.
+      groupKeysMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, record.getField(previous))
+      }
+      // Evaluate final aggregate value and set to output.
+      aggregateMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, aggregates(previous).evaluate(record))
+      }
+      out.collect(output)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala
new file mode 100644
index 0000000..34b25e0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MaxAggregate.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.types.Row
+
+abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
+
+  protected var maxIndex = -1
+
+  /**
+   * Initiate the intermediate aggregate value in Row.
+   *
+   * @param intermediate The intermediate aggregate row to initiate.
+   */
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(maxIndex, null)
+  }
+
+  /**
+   * Accessed in MapFunction, prepare the input of partial aggregate.
+   *
+   * @param value
+   * @param intermediate
+   */
+  override def prepare(value: Any, intermediate: Row): Unit = {
+    if (value == null) {
+      initiate(intermediate)
+    } else {
+      intermediate.setField(maxIndex, value)
+    }
+  }
+
+  /**
+   * Accessed in CombineFunction and GroupReduceFunction, merge partial
+   * aggregate result into aggregate buffer.
+   *
+   * @param intermediate
+   * @param buffer
+   */
+  override def merge(intermediate: Row, buffer: Row): Unit = {
+    val partialValue = intermediate.getField(maxIndex).asInstanceOf[T]
+    if (partialValue != null) {
+      val bufferValue = buffer.getField(maxIndex).asInstanceOf[T]
+      if (bufferValue != null) {
+        val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
+        buffer.setField(maxIndex, max)
+      } else {
+        buffer.setField(maxIndex, partialValue)
+      }
+    }
+  }
+
+  /**
+   * Return the final aggregated result based on aggregate buffer.
+   *
+   * @param buffer
+   * @return
+   */
+  override def evaluate(buffer: Row): T = {
+    buffer.getField(maxIndex).asInstanceOf[T]
+  }
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggOffset: Int): Unit = {
+    maxIndex = aggOffset
+  }
+}
+
+class ByteMaxAggregate extends MaxAggregate[Byte] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
+
+}
+
+class ShortMaxAggregate extends MaxAggregate[Short] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
+
+}
+
+class IntMaxAggregate extends MaxAggregate[Int] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
+
+}
+
+class LongMaxAggregate extends MaxAggregate[Long] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
+
+}
+
+class FloatMaxAggregate extends MaxAggregate[Float] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
+
+}
+
+class DoubleMaxAggregate extends MaxAggregate[Double] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
+
+}
+
+class BooleanMaxAggregate extends MaxAggregate[Boolean] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
+
+}
+
+class DecimalMaxAggregate extends Aggregate[BigDecimal] {
+
+  protected var minIndex: Int = _
+
+  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
+
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(minIndex, null)
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      initiate(partial)
+    } else {
+      partial.setField(minIndex, value)
+    }
+  }
+
+  override def merge(partial: Row, buffer: Row): Unit = {
+    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
+    if (partialValue != null) {
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
+      if (bufferValue != null) {
+        val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue
+        buffer.setField(minIndex, min)
+      } else {
+        buffer.setField(minIndex, partialValue)
+      }
+    }
+  }
+
+  override def evaluate(buffer: Row): BigDecimal = {
+    buffer.getField(minIndex).asInstanceOf[BigDecimal]
+  }
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggOffset: Int): Unit = {
+    minIndex = aggOffset
+  }
+}