You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:00 UTC

[31/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
deleted file mode 100644
index 273aa60..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-
-/**
- * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
- * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
- * -- In Map phase, use prepare() to transform aggregate field value into intermediate
- * aggregate value.
- * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
- * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
- * For associative decomposable aggregate functions, they support partial aggregate. To optimize
- * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
- * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
- * into aggregate buffer.
- *
- * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
- * field index in Row, so different aggregate functions could share the same Row as intermediate
- * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
- * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
- * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
- *
- * @tparam T Aggregated value type.
- */
-trait Aggregate[T] extends Serializable {
-
-  /**
-    * Transform the aggregate field value into intermediate aggregate data.
-    *
-    * @param value The value to insert into the intermediate aggregate row.
-    * @param intermediate The intermediate aggregate row into which the value is inserted.
-    */
-  def prepare(value: Any, intermediate: Row): Unit
-
-  /**
-    * Initiate the intermediate aggregate value in Row.
-    *
-    * @param intermediate The intermediate aggregate row to initiate.
-    */
-  def initiate(intermediate: Row): Unit
-
-  /**
-    * Merge intermediate aggregate data into aggregate buffer.
-    *
-    * @param intermediate The intermediate aggregate row to merge.
-    * @param buffer The aggregate buffer into which the intermedidate is merged.
-    */
-  def merge(intermediate: Row, buffer: Row): Unit
-
-  /**
-    * Calculate the final aggregated result based on aggregate buffer.
-    *
-    * @param buffer The aggregate buffer from which the final aggregate is computed.
-    * @return The final result of the aggregate.
-    */
-  def evaluate(buffer: Row): T
-
-  /**
-    * Intermediate aggregate value types.
-    *
-    * @return The types of the intermediate fields of this aggregate.
-    */
-  def intermediateDataType: Array[TypeInformation[_]]
-
-  /**
-    * Set the aggregate data offset in Row.
-    *
-    * @param aggOffset The offset of this aggregate in the intermediate aggregate rows.
-    */
-  def setAggOffsetInRow(aggOffset: Int)
-
-  /**
-    * Whether aggregate function support partial aggregate.
-    *
-    * @return True if the aggregate supports partial aggregation, False otherwise.
-    */
-  def supportPartial: Boolean = false
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
deleted file mode 100644
index 4c473d4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-class AggregateAllTimeWindowFunction(
-    groupReduceFunction: RichGroupReduceFunction[Row, Row],
-    windowStartPos: Option[Int],
-    windowEndPos: Option[Int])
-  extends AggregateAllWindowFunction[TimeWindow](groupReduceFunction) {
-
-  private var collector: TimeWindowPropertyCollector = _
-
-  override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
-    super.open(parameters)
-  }
-
-  override def apply(window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit = {
-
-    // set collector and window
-    collector.wrappedCollector = out
-    collector.timeWindow = window
-
-    // call wrapped reduce function with property collector
-    super.apply(window, input, collector)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
deleted file mode 100644
index db5f477..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-class AggregateAllWindowFunction[W <: Window](
-    groupReduceFunction: RichGroupReduceFunction[Row, Row])
-  extends RichAllWindowFunction[Row, Row, W] {
-
-  override def open(parameters: Configuration): Unit = {
-    groupReduceFunction.open(parameters)
-  }
-
-  override def apply(window: W, input: Iterable[Row], out: Collector[Row]): Unit = {
-    groupReduceFunction.reduce(input, out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
deleted file mode 100644
index 0699bfa..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Preconditions
-
-class AggregateMapFunction[IN, OUT](
-    private val aggregates: Array[Aggregate[_]],
-    private val aggFields: Array[Int],
-    private val groupingKeys: Array[Int],
-    @transient private val returnType: TypeInformation[OUT])
-  extends RichMapFunction[IN, OUT]
-  with ResultTypeQueryable[OUT] {
-  
-  private var output: Row = _
-  
-  override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(aggFields)
-    Preconditions.checkArgument(aggregates.size == aggFields.size)
-    val partialRowLength = groupingKeys.length +
-        aggregates.map(_.intermediateDataType.length).sum
-    output = new Row(partialRowLength)
-  }
-
-  override def map(value: IN): OUT = {
-    
-    val input = value.asInstanceOf[Row]
-    for (i <- 0 until aggregates.length) {
-      val fieldValue = input.getField(aggFields(i))
-      aggregates(i).prepare(fieldValue, output)
-    }
-    for (i <- 0 until groupingKeys.length) {
-      output.setField(i, input.getField(groupingKeys(i)))
-    }
-    output.asInstanceOf[OUT]
-  }
-
-  override def getProducedType: TypeInformation[OUT] = {
-    returnType
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
deleted file mode 100644
index b2cf07e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction}
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.{Collector, Preconditions}
-
-import scala.collection.JavaConversions._
-
-
-/**
- * It wraps the aggregate logic inside of
- * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
- * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
- *
- * @param aggregates   The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- *                         and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- *                         index in output Row.
- */
-class AggregateReduceCombineFunction(
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val aggregateMapping: Array[(Int, Int)],
-    private val intermediateRowArity: Int,
-    private val finalRowArity: Int)
-  extends AggregateReduceGroupFunction(
-    aggregates,
-    groupKeysMapping,
-    aggregateMapping,
-    intermediateRowArity,
-    finalRowArity)
-  with CombineFunction[Row, Row] {
-
-  /**
-   * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
-   *
-   * @param records  Sub-grouped intermediate aggregate Rows iterator.
-   * @return Combined intermediate aggregate Row.
-   *
-   */
-  override def combine(records: Iterable[Row]): Row = {
-
-    // Initiate intermediate aggregate value.
-    aggregates.foreach(_.initiate(aggregateBuffer))
-
-    // Merge intermediate aggregate value to buffer.
-    var last: Row = null
-    records.foreach((record) => {
-      aggregates.foreach(_.merge(record, aggregateBuffer))
-      last = record
-    })
-
-    // Set group keys to aggregateBuffer.
-    for (i <- groupKeysMapping.indices) {
-      aggregateBuffer.setField(i, last.getField(i))
-    }
-
-    aggregateBuffer
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
deleted file mode 100644
index 6fe712b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.{Collector, Preconditions}
-
-import scala.collection.JavaConversions._
-
-/**
- * It wraps the aggregate logic inside of 
- * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
- *
- * @param aggregates   The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
- *                         and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- *                         index in output Row.
- */
-class AggregateReduceGroupFunction(
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val aggregateMapping: Array[(Int, Int)],
-    private val intermediateRowArity: Int,
-    private val finalRowArity: Int)
-  extends RichGroupReduceFunction[Row, Row] {
-
-  protected var aggregateBuffer: Row = _
-  private var output: Row = _
-
-  override def open(config: Configuration) {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
-    aggregateBuffer = new Row(intermediateRowArity)
-    output = new Row(finalRowArity)
-  }
-
-  /**
-   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
-   * calculate aggregated values output by aggregate buffer, and set them into output 
-   * Row based on the mapping relation between intermediate aggregate data and output data.
-   *
-   * @param records  Grouped intermediate aggregate Rows iterator.
-   * @param out The collector to hand results to.
-   *
-   */
-  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
-
-    // Initiate intermediate aggregate value.
-    aggregates.foreach(_.initiate(aggregateBuffer))
-
-    // Merge intermediate aggregate value to buffer.
-    var last: Row = null
-    records.foreach((record) => {
-      aggregates.foreach(_.merge(record, aggregateBuffer))
-      last = record
-    })
-
-    // Set group keys value to final output.
-    groupKeysMapping.foreach {
-      case (after, previous) =>
-        output.setField(after, last.getField(previous))
-    }
-
-    // Evaluate final aggregate value and set to output.
-    aggregateMapping.foreach {
-      case (after, previous) =>
-        output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
-    }
-
-    out.collect(output)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
deleted file mode 100644
index ff8f6fb..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-class AggregateTimeWindowFunction(
-    groupReduceFunction: RichGroupReduceFunction[Row, Row],
-    windowStartPos: Option[Int],
-    windowEndPos: Option[Int])
-  extends AggregateWindowFunction[TimeWindow](groupReduceFunction) {
-
-  private var collector: TimeWindowPropertyCollector = _
-
-  override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
-    super.open(parameters)
-  }
-
-  override def apply(
-    key: Tuple,
-    window: TimeWindow,
-    input: Iterable[Row],
-    out: Collector[Row]): Unit = {
-
-    // set collector and window
-    collector.wrappedCollector = out
-    collector.timeWindow = window
-
-    // call wrapped reduce function with property collector
-    super.apply(key, window, input, collector)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
deleted file mode 100644
index a181068..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ /dev/null
@@ -1,593 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.util
-
-import org.apache.calcite.rel.`type`._
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
-import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.expressions.{WindowEnd, WindowStart}
-import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.{FlinkTypeFactory,  TableException}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-object AggregateUtil {
-
-  type CalcitePair[T, R] = org.apache.calcite.util.Pair[T, R]
-  type JavaList[T] = java.util.List[T]
-
-  /**
-    * Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
-    * The function returns intermediate aggregate values of all aggregate function which are
-    * organized by the following format:
-    *
-    * {{{
-    *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
-    *                             |                          |
-    *                             v                          v
-    *        +---------+---------+--------+--------+--------+--------+
-    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-    *        +---------+---------+--------+--------+--------+--------+
-    *                                              ^
-    *                                              |
-    *                               sum(y) aggOffsetInRow = 4
-    * }}}
-    *
-    */
-  private[flink] def createPrepareMapFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    groupings: Array[Int],
-    inputType: RelDataType): MapFunction[Any, Row] = {
-
-    val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
-      namedAggregates.map(_.getKey),
-      inputType,
-      groupings.length)
-
-    val mapReturnType: RowTypeInfo =
-      createAggregateBufferDataType(groupings, aggregates, inputType)
-
-    val mapFunction = new AggregateMapFunction[Row, Row](
-      aggregates,
-      aggFieldIndexes,
-      groupings,
-      mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-    mapFunction
-  }
-
-  /**
-    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute aggregates.
-    * If all aggregates support partial aggregation, the
-    * [[org.apache.flink.api.common.functions.GroupReduceFunction]] implements
-    * [[org.apache.flink.api.common.functions.CombineFunction]] as well.
-    *
-    */
-  private[flink] def createAggregateGroupReduceFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int]): RichGroupReduceFunction[Row, Row] = {
-
-    val aggregates = transformToAggregateFunctions(
-      namedAggregates.map(_.getKey),
-      inputType,
-      groupings.length)._2
-
-    val (groupingOffsetMapping, aggOffsetMapping) =
-      getGroupingOffsetAndAggOffsetMapping(
-        namedAggregates,
-        inputType,
-        outputType,
-        groupings)
-
-    val allPartialAggregate: Boolean = aggregates.forall(_.supportPartial)
-
-    val intermediateRowArity = groupings.length +
-      aggregates.map(_.intermediateDataType.length).sum
-
-    val groupReduceFunction =
-      if (allPartialAggregate) {
-        new AggregateReduceCombineFunction(
-          aggregates,
-          groupingOffsetMapping,
-          aggOffsetMapping,
-          intermediateRowArity,
-          outputType.getFieldCount)
-      }
-      else {
-        new AggregateReduceGroupFunction(
-          aggregates,
-          groupingOffsetMapping,
-          aggOffsetMapping,
-          intermediateRowArity,
-          outputType.getFieldCount)
-      }
-    groupReduceFunction
-  }
-
-  /**
-    * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
-    * aggregation.
-    *
-    */
-  private[flink] def createIncrementalAggregateReduceFunction(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int]): IncrementalAggregateReduceFunction = {
-
-    val aggregates = transformToAggregateFunctions(
-      namedAggregates.map(_.getKey),inputType,groupings.length)._2
-
-    val groupingOffsetMapping =
-      getGroupingOffsetAndAggOffsetMapping(
-        namedAggregates,
-        inputType,
-        outputType,
-        groupings)._1
-
-    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
-    val reduceFunction = new IncrementalAggregateReduceFunction(
-      aggregates,
-      groupingOffsetMapping,
-      intermediateRowArity)
-    reduceFunction
-  }
-
-  /**
-    * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
-    */
-  private[flink] def createAllWindowAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty])
-  : AllWindowFunction[Row, Row, DataStreamWindow] = {
-
-    val aggFunction =
-      createAggregateGroupReduceFunction(
-        namedAggregates,
-        inputType,
-        outputType,
-        groupings)
-
-    if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
-      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
-      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
-    } else {
-      new AggregateAllWindowFunction(aggFunction)
-    }
-  }
-
-  /**
-    * Create a [[WindowFunction]] to compute partitioned group window aggregates.
-    *
-    */
-  private[flink] def createWindowAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty])
-  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
-
-    val aggFunction =
-      createAggregateGroupReduceFunction(
-        namedAggregates,
-        inputType,
-        outputType,
-        groupings)
-
-    if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
-      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
-      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
-    } else {
-      new AggregateWindowFunction(aggFunction)
-    }
-  }
-
-  /**
-    * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
-    * window aggreagtes.
-    */
-  private[flink] def createAllWindowIncrementalAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
-
-    val aggregates = transformToAggregateFunctions(
-      namedAggregates.map(_.getKey),inputType,groupings.length)._2
-
-    val (groupingOffsetMapping, aggOffsetMapping) =
-      getGroupingOffsetAndAggOffsetMapping(
-      namedAggregates,
-      inputType,
-      outputType,
-      groupings)
-
-    val finalRowArity = outputType.getFieldCount
-
-    if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
-      new IncrementalAggregateAllTimeWindowFunction(
-        aggregates,
-        groupingOffsetMapping,
-        aggOffsetMapping,
-        finalRowArity,
-        startPos,
-        endPos)
-      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
-    } else {
-      new IncrementalAggregateAllWindowFunction(
-        aggregates,
-        groupingOffsetMapping,
-        aggOffsetMapping,
-        finalRowArity)
-    }
-  }
-
-  /**
-    * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
-    */
-  private[flink] def createWindowIncrementalAggregationFunction(
-    window: LogicalWindow,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int],
-    properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
-
-    val aggregates = transformToAggregateFunctions(
-      namedAggregates.map(_.getKey),inputType,groupings.length)._2
-
-    val (groupingOffsetMapping, aggOffsetMapping) =
-      getGroupingOffsetAndAggOffsetMapping(
-        namedAggregates,
-        inputType,
-        outputType,
-        groupings)
-
-    val finalRowArity = outputType.getFieldCount
-
-    if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
-      new IncrementalAggregateTimeWindowFunction(
-        aggregates,
-        groupingOffsetMapping,
-        aggOffsetMapping,
-        finalRowArity,
-        startPos,
-        endPos)
-      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
-    } else {
-      new IncrementalAggregateWindowFunction(
-        aggregates,
-        groupingOffsetMapping,
-        aggOffsetMapping,
-        finalRowArity)
-    }
-  }
-
-  /**
-    * Return true if all aggregates can be partially computed. False otherwise.
-    */
-  private[flink] def doAllSupportPartialAggregation(
-    aggregateCalls: Seq[AggregateCall],
-    inputType: RelDataType,
-    groupKeysCount: Int): Boolean = {
-    transformToAggregateFunctions(
-      aggregateCalls,
-      inputType,
-      groupKeysCount)._2.forall(_.supportPartial)
-  }
-
-  /**
-    * @return groupingOffsetMapping (mapping relation between field index of intermediate
-    *         aggregate Row and output Row.)
-    *         and aggOffsetMapping (the mapping relation between aggregate function index in list
-    *         and its corresponding field index in output Row.)
-    */
-  private def getGroupingOffsetAndAggOffsetMapping(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    inputType: RelDataType,
-    outputType: RelDataType,
-    groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
-
-    // the mapping relation between field index of intermediate aggregate Row and output Row.
-    val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
-
-    // the mapping relation between aggregate function index in list and its corresponding
-    // field index in output Row.
-    val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-    if (groupingOffsetMapping.length != groupings.length ||
-      aggOffsetMapping.length != namedAggregates.length) {
-      throw new TableException(
-        "Could not find output field in input data type " +
-          "or aggregate functions.")
-    }
-    (groupingOffsetMapping, aggOffsetMapping)
-  }
-
-  private def isTimeWindow(window: LogicalWindow) = {
-    window match {
-      case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
-      case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
-      case ProcessingTimeSessionGroupWindow(_, _) => true
-      case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
-      case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
-      case EventTimeSessionGroupWindow(_, _, _) => true
-    }
-  }
-
-  private def computeWindowStartEndPropertyPos(
-    properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = {
-
-    val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
-      (p, x) => p match {
-        case NamedWindowProperty(name, prop) =>
-          prop match {
-            case WindowStart(_) if x._1.isDefined =>
-              throw new TableException("Duplicate WindowStart property encountered. This is a bug.")
-            case WindowStart(_) =>
-              (Some(x._3), x._2, x._3 - 1)
-            case WindowEnd(_) if x._2.isDefined =>
-              throw new TableException("Duplicate WindowEnd property encountered. This is a bug.")
-            case WindowEnd(_) =>
-              (x._1, Some(x._3), x._3 - 1)
-          }
-      }
-    }
-    (propPos._1, propPos._2)
-  }
-
-  private def transformToAggregateFunctions(
-    aggregateCalls: Seq[AggregateCall],
-    inputType: RelDataType,
-    groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
-
-    // store the aggregate fields of each aggregate function, by the same order of aggregates.
-    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
-    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
-
-    // set the start offset of aggregate buffer value to group keys' length, 
-    // as all the group keys would be moved to the start fields of intermediate
-    // aggregate data.
-    var aggOffset = groupKeysCount
-
-    // create aggregate function instances by function type and aggregate field data type.
-    aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
-      val argList: util.List[Integer] = aggregateCall.getArgList
-      if (argList.isEmpty) {
-        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-          aggFieldIndexes(index) = 0
-        } else {
-          throw new TableException("Aggregate fields should not be empty.")
-        }
-      } else {
-        if (argList.size() > 1) {
-          throw new TableException("Currently, do not support aggregate on multi fields.")
-        }
-        aggFieldIndexes(index) = argList.get(0)
-      }
-      val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
-      aggregateCall.getAggregation match {
-        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
-          aggregates(index) = sqlTypeName match {
-            case TINYINT =>
-              new ByteSumAggregate
-            case SMALLINT =>
-              new ShortSumAggregate
-            case INTEGER =>
-              new IntSumAggregate
-            case BIGINT =>
-              new LongSumAggregate
-            case FLOAT =>
-              new FloatSumAggregate
-            case DOUBLE =>
-              new DoubleSumAggregate
-            case DECIMAL =>
-              new DecimalSumAggregate
-            case sqlType: SqlTypeName =>
-              throw new TableException("Sum aggregate does no support type:" + sqlType)
-          }
-        }
-        case _: SqlAvgAggFunction => {
-          aggregates(index) = sqlTypeName match {
-            case TINYINT =>
-               new ByteAvgAggregate
-            case SMALLINT =>
-              new ShortAvgAggregate
-            case INTEGER =>
-              new IntAvgAggregate
-            case BIGINT =>
-              new LongAvgAggregate
-            case FLOAT =>
-              new FloatAvgAggregate
-            case DOUBLE =>
-              new DoubleAvgAggregate
-            case DECIMAL =>
-              new DecimalAvgAggregate
-            case sqlType: SqlTypeName =>
-              throw new TableException("Avg aggregate does no support type:" + sqlType)
-          }
-        }
-        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
-          aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
-            sqlTypeName match {
-              case TINYINT =>
-                new ByteMinAggregate
-              case SMALLINT =>
-                new ShortMinAggregate
-              case INTEGER =>
-                new IntMinAggregate
-              case BIGINT =>
-                new LongMinAggregate
-              case FLOAT =>
-                new FloatMinAggregate
-              case DOUBLE =>
-                new DoubleMinAggregate
-              case DECIMAL =>
-                new DecimalMinAggregate
-              case BOOLEAN =>
-                new BooleanMinAggregate
-              case sqlType: SqlTypeName =>
-                throw new TableException("Min aggregate does no support type:" + sqlType)
-            }
-          } else {
-            sqlTypeName match {
-              case TINYINT =>
-                new ByteMaxAggregate
-              case SMALLINT =>
-                new ShortMaxAggregate
-              case INTEGER =>
-                new IntMaxAggregate
-              case BIGINT =>
-                new LongMaxAggregate
-              case FLOAT =>
-                new FloatMaxAggregate
-              case DOUBLE =>
-                new DoubleMaxAggregate
-              case DECIMAL =>
-                new DecimalMaxAggregate
-              case BOOLEAN =>
-                new BooleanMaxAggregate
-              case sqlType: SqlTypeName =>
-                throw new TableException("Max aggregate does no support type:" + sqlType)
-            }
-          }
-        }
-        case _: SqlCountAggFunction =>
-          aggregates(index) = new CountAggregate
-        case unSupported: SqlAggFunction =>
-          throw new TableException("unsupported Function: " + unSupported.getName)
-      }
-      setAggregateDataOffset(index)
-    }
-
-    // set the aggregate intermediate data start index in Row, and update current value.
-    def setAggregateDataOffset(index: Int): Unit = {
-      aggregates(index).setAggOffsetInRow(aggOffset)
-      aggOffset += aggregates(index).intermediateDataType.length
-    }
-
-    (aggFieldIndexes, aggregates)
-  }
-
-  private def createAggregateBufferDataType(
-    groupings: Array[Int],
-    aggregates: Array[Aggregate[_]],
-    inputType: RelDataType): RowTypeInfo = {
-
-    // get the field data types of group keys.
-    val groupingTypes: Seq[TypeInformation[_]] = groupings
-      .map(inputType.getFieldList.get(_).getType)
-      .map(FlinkTypeFactory.toTypeInfo)
-
-    val aggPartialNameSuffix = "agg_buffer_"
-    val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
-
-    // get all field data types of all intermediate aggregates
-    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
-
-    // concat group key types and aggregation types
-    val allFieldTypes = groupingTypes ++: aggTypes
-    val partialType = new RowTypeInfo(allFieldTypes: _*)
-    partialType
-  }
-
-  // Find the mapping between the index of aggregate list and aggregated value index in output Row.
-  private def getAggregateMapping(
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    outputType: RelDataType): Array[(Int, Int)] = {
-
-    // the mapping relation between aggregate function index in list and its corresponding
-    // field index in output Row.
-    var aggOffsetMapping = ArrayBuffer[(Int, Int)]()
-
-    outputType.getFieldList.zipWithIndex.foreach{
-      case (outputFieldType, outputIndex) =>
-        namedAggregates.zipWithIndex.foreach {
-          case (namedAggCall, aggregateIndex) =>
-            if (namedAggCall.getValue.equals(outputFieldType.getName) &&
-                namedAggCall.getKey.getType.equals(outputFieldType.getType)) {
-              aggOffsetMapping += ((outputIndex, aggregateIndex))
-            }
-        }
-    }
-   
-    aggOffsetMapping.toArray
-  }
-
-  // Find the mapping between the index of group key in intermediate aggregate Row and its index
-  // in output Row.
-  private def getGroupKeysMapping(
-    inputDatType: RelDataType,
-    outputType: RelDataType,
-    groupKeys: Array[Int]): Array[(Int, Int)] = {
-
-    // the mapping relation between field index of intermediate aggregate Row and output Row.
-    var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()
-
-    outputType.getFieldList.zipWithIndex.foreach {
-      case (outputFieldType, outputIndex) =>
-        inputDatType.getFieldList.zipWithIndex.foreach {
-          // find the field index in input data type.
-          case (inputFieldType, inputIndex) =>
-            if (outputFieldType.getName.equals(inputFieldType.getName) &&
-                outputFieldType.getType.equals(inputFieldType.getType)) {
-              // as aggregated field in output data type would not have a matched field in
-              // input data, so if inputIndex is not -1, it must be a group key. Then we can
-              // find the field index in buffer data by the group keys index mapping between
-              // input data and buffer data.
-              for (i <- groupKeys.indices) {
-                if (inputIndex == groupKeys(i)) {
-                  groupingOffsetMapping += ((outputIndex, i))
-                }
-              }
-            }
-        }
-    }
-
-    groupingOffsetMapping.toArray
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
deleted file mode 100644
index 4e77549..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-class AggregateWindowFunction[W <: Window](groupReduceFunction: RichGroupReduceFunction[Row, Row])
-  extends RichWindowFunction[Row, Row, Tuple, W] {
-
-  override def open(parameters: Configuration): Unit = {
-    groupReduceFunction.open(parameters)
-  }
-
-  override def apply(
-    key: Tuple,
-    window: W,
-    input: Iterable[Row],
-    out: Collector[Row]): Unit = {
-
-    groupReduceFunction.reduce(input, out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
deleted file mode 100644
index 998ae62..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import com.google.common.math.LongMath
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-import java.math.BigDecimal
-import java.math.BigInteger
-
-abstract class AvgAggregate[T] extends Aggregate[T] {
-  protected var partialSumIndex: Int = _
-  protected var partialCountIndex: Int = _
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    partialSumIndex = aggOffset
-    partialCountIndex = aggOffset + 1
-  }
-}
-
-abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, 0L)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      partial.setField(partialSumIndex, 0L)
-      partial.setField(partialCountIndex, 0L)
-    } else {
-      doPrepare(value, partial)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum))
-    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
-  }
-
-  override def evaluate(buffer : Row): T = {
-    doEvaluate(buffer).asInstanceOf[T]
-  }
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  def doPrepare(value: Any, partial: Row): Unit
-
-  def doEvaluate(buffer: Row): Any
-}
-
-class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Byte]
-    partial.setField(partialSumIndex, input.toLong)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toByte
-    }
-  }
-}
-
-class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Short]
-    partial.setField(partialSumIndex, input.toLong)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toShort
-    }
-  }
-}
-
-class IntAvgAggregate extends IntegralAvgAggregate[Int] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Int]
-    partial.setField(partialSumIndex, input.toLong)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toInt
-    }
-  }
-}
-
-class LongAvgAggregate extends IntegralAvgAggregate[Long] {
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.BIG_INT_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, BigInteger.ZERO)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      partial.setField(partialSumIndex, BigInteger.ZERO)
-      partial.setField(partialCountIndex, 0L)
-    } else {
-      doPrepare(value, partial)
-    }
-  }
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Long]
-    partial.setField(partialSumIndex, BigInteger.valueOf(input))
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    buffer.setField(partialSumIndex, partialSum.add(bufferSum))
-    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
-    }
-  }
-}
-
-abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, 0D)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      partial.setField(partialSumIndex, 0D)
-      partial.setField(partialCountIndex, 0L)
-    } else {
-      doPrepare(value, partial)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-
-    buffer.setField(partialSumIndex, partialSum + bufferSum)
-    buffer.setField(partialCountIndex, partialCount + bufferCount)
-  }
-
-  override def evaluate(buffer : Row): T = {
-    doEvaluate(buffer).asInstanceOf[T]
-  }
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.DOUBLE_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  def doPrepare(value: Any, partial: Row): Unit
-
-  def doEvaluate(buffer: Row): Any
-}
-
-class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Float]
-    partial.setField(partialSumIndex, input.toDouble)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount).toFloat
-    }
-  }
-}
-
-class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
-
-  override def doPrepare(value: Any, partial: Row): Unit = {
-    val input = value.asInstanceOf[Double]
-    partial.setField(partialSumIndex, input)
-    partial.setField(partialCountIndex, 1L)
-  }
-
-  override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount == 0L) {
-      null
-    } else {
-      (bufferSum / bufferCount)
-    }
-  }
-}
-
-class DecimalAvgAggregate extends AvgAggregate[BigDecimal] {
-
-  override def intermediateDataType = Array(
-    BasicTypeInfo.BIG_DEC_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO)
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(partialSumIndex, BigDecimal.ZERO)
-    partial.setField(partialCountIndex, 0L)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      val input = value.asInstanceOf[BigDecimal]
-      partial.setField(partialSumIndex, input)
-      partial.setField(partialCountIndex, 1L)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal]
-    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    buffer.setField(partialSumIndex, partialSum.add(bufferSum))
-    buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-    if (bufferCount != 0) {
-      val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
-      bufferSum.divide(BigDecimal.valueOf(bufferCount))
-    } else {
-      null.asInstanceOf[BigDecimal]
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
deleted file mode 100644
index 4d6d20b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.types.Row
-
-class CountAggregate extends Aggregate[Long] {
-  private var countIndex: Int = _
-
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(countIndex, 0L)
-  }
-
-  override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialCount = intermediate.getField(countIndex).asInstanceOf[Long]
-    val bufferCount = buffer.getField(countIndex).asInstanceOf[Long]
-    buffer.setField(countIndex, partialCount + bufferCount)
-  }
-
-  override def evaluate(buffer: Row): Long = {
-    buffer.getField(countIndex).asInstanceOf[Long]
-  }
-
-  override def prepare(value: Any, intermediate: Row): Unit = {
-    if (value == null) {
-      intermediate.setField(countIndex, 0L)
-    } else {
-      intermediate.setField(countIndex, 1L)
-    }
-  }
-
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggIndex: Int): Unit = {
-    countIndex = aggIndex
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
deleted file mode 100644
index 48e2313..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
-import org.apache.flink.util.Collector
-/**
-  *
-  * Computes the final aggregate value from incrementally computed aggreagtes.
-  *
-  * @param aggregates   The aggregate functions.
-  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
-  *                         and output Row.
-  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
-  *                         index in output Row.
-  * @param finalRowArity  The arity of the final output row.
-  */
-class IncrementalAggregateAllTimeWindowFunction(
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val aggregateMapping: Array[(Int, Int)],
-    private val finalRowArity: Int,
-    private val windowStartPos: Option[Int],
-    private val windowEndPos: Option[Int])
-  extends IncrementalAggregateAllWindowFunction[TimeWindow](
-    aggregates,
-    groupKeysMapping,
-    aggregateMapping,
-    finalRowArity) {
-
-  private var collector: TimeWindowPropertyCollector = _
-
-  override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
-    super.open(parameters)
-  }
-
-  override def apply(
-    window: TimeWindow,
-    records: Iterable[Row],
-    out: Collector[Row]): Unit = {
-
-    // set collector and window
-    collector.wrappedCollector = out
-    collector.timeWindow = window
-
-    super.apply(window, records, collector)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
deleted file mode 100644
index 1a85dca..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
-  * Computes the final aggregate value from incrementally computed aggreagtes.
-  *
-  * @param aggregates   The aggregate functions.
-  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
-  *                         and output Row.
-  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
-  *                         index in output Row.
-  * @param finalRowArity  The arity of the final output row.
-  */
-class IncrementalAggregateAllWindowFunction[W <: Window](
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val aggregateMapping: Array[(Int, Int)],
-    private val finalRowArity: Int)
-  extends RichAllWindowFunction[Row, Row, W] {
-
-  private var output: Row = _
-
-  override def open(parameters: Configuration): Unit = {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
-    output = new Row(finalRowArity)
-  }
-
-  /**
-    * Calculate aggregated values output by aggregate buffer, and set them into output
-    * Row based on the mapping relation between intermediate aggregate data and output data.
-    */
-  override def apply(
-    window: W,
-    records: Iterable[Row],
-    out: Collector[Row]): Unit = {
-
-    val iterator = records.iterator
-
-    if (iterator.hasNext) {
-      val record = iterator.next()
-      // Set group keys value to final output.
-      groupKeysMapping.foreach {
-        case (after, previous) =>
-          output.setField(after, record.getField(previous))
-      }
-      // Evaluate final aggregate value and set to output.
-      aggregateMapping.foreach {
-        case (after, previous) =>
-          output.setField(after, aggregates(previous).evaluate(record))
-      }
-      out.collect(output)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
deleted file mode 100644
index 5c36821..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
-
-/**
-  * Incrementally computes group window aggregates.
-  *
-  * @param aggregates   The aggregate functions.
-  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
-  *                         and output Row.
-  */
-class IncrementalAggregateReduceFunction(
-    private val aggregates: Array[Aggregate[_]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val intermediateRowArity: Int)
-  extends ReduceFunction[Row] {
-
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(groupKeysMapping)
-
-  /**
-    * For Incremental intermediate aggregate Rows, merge value1 and value2
-    * into aggregate buffer, return aggregate buffer.
-    *
-    * @param value1 The first value to combined.
-    * @param value2 The second value to combined.
-    * @return  accumulatorRow A resulting row that combines two input values.
-    *
-    */
-  override def reduce(value1: Row, value2: Row): Row = {
-
-    // TODO: once FLINK-5105 is solved, we can avoid creating a new row for each invocation
-    //   and directly merge value1 and value2.
-    val accumulatorRow = new Row(intermediateRowArity)
-
-    // copy all fields of value1 into accumulatorRow
-    (0 until intermediateRowArity)
-    .foreach(i => accumulatorRow.setField(i, value1.getField(i)))
-    // merge value2 to accumulatorRow
-    aggregates.foreach(_.merge(value2, accumulatorRow))
-
-    accumulatorRow
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
deleted file mode 100644
index 2513383..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-/**
-  * Computes the final aggregate value from incrementally computed aggreagtes.
-  *
-  * @param aggregates   The aggregate functions.
-  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
-  *                         and output Row.
-  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
-  *                         index in output Row.
-  * @param finalRowArity  The arity of the final output row.
-  */
-class IncrementalAggregateTimeWindowFunction(
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val aggregateMapping: Array[(Int, Int)],
-    private val finalRowArity: Int,
-    private val windowStartPos: Option[Int],
-    private val windowEndPos: Option[Int])
-  extends IncrementalAggregateWindowFunction[TimeWindow](
-    aggregates,
-    groupKeysMapping,
-    aggregateMapping, finalRowArity) {
-
-  private var collector: TimeWindowPropertyCollector = _
-
-  override def open(parameters: Configuration): Unit = {
-    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
-    super.open(parameters)
-  }
-
-  override def apply(
-    key: Tuple,
-    window: TimeWindow,
-    records: Iterable[Row],
-    out: Collector[Row]): Unit = {
-
-    // set collector and window
-    collector.wrappedCollector = out
-    collector.timeWindow = window
-
-    super.apply(key, window, records, collector)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
deleted file mode 100644
index d0d71ee..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
-  * Computes the final aggregate value from incrementally computed aggreagtes.
-  *
-  * @param aggregates   The aggregate functions.
-  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
-  *                         and output Row.
-  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
-  *                         index in output Row.
-  * @param finalRowArity  The arity of the final output row.
-  */
-class IncrementalAggregateWindowFunction[W <: Window](
-    private val aggregates: Array[Aggregate[_ <: Any]],
-    private val groupKeysMapping: Array[(Int, Int)],
-    private val aggregateMapping: Array[(Int, Int)],
-    private val finalRowArity: Int)
-  extends RichWindowFunction[Row, Row, Tuple, W] {
-
-  private var output: Row = _
-
-  override def open(parameters: Configuration): Unit = {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
-    output = new Row(finalRowArity)
-  }
-
-  /**
-    * Calculate aggregated values output by aggregate buffer, and set them into output
-    * Row based on the mapping relation between intermediate aggregate data and output data.
-    */
-  override def apply(
-    key: Tuple,
-    window: W,
-    records: Iterable[Row],
-    out: Collector[Row]): Unit = {
-
-    val iterator = records.iterator
-
-    if (iterator.hasNext) {
-      val record = iterator.next()
-      // Set group keys value to final output.
-      groupKeysMapping.foreach {
-        case (after, previous) =>
-          output.setField(after, record.getField(previous))
-      }
-      // Evaluate final aggregate value and set to output.
-      aggregateMapping.foreach {
-        case (after, previous) =>
-          output.setField(after, aggregates(previous).evaluate(record))
-      }
-      out.collect(output)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
deleted file mode 100644
index 2cb3dc7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
-
-  protected var maxIndex = -1
-
-  /**
-   * Initiate the intermediate aggregate value in Row.
-   *
-   * @param intermediate The intermediate aggregate row to initiate.
-   */
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, null)
-  }
-
-  /**
-   * Accessed in MapFunction, prepare the input of partial aggregate.
-   *
-   * @param value
-   * @param intermediate
-   */
-  override def prepare(value: Any, intermediate: Row): Unit = {
-    if (value == null) {
-      initiate(intermediate)
-    } else {
-      intermediate.setField(maxIndex, value)
-    }
-  }
-
-  /**
-   * Accessed in CombineFunction and GroupReduceFunction, merge partial
-   * aggregate result into aggregate buffer.
-   *
-   * @param intermediate
-   * @param buffer
-   */
-  override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialValue = intermediate.getField(maxIndex).asInstanceOf[T]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(maxIndex).asInstanceOf[T]
-      if (bufferValue != null) {
-        val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
-        buffer.setField(maxIndex, max)
-      } else {
-        buffer.setField(maxIndex, partialValue)
-      }
-    }
-  }
-
-  /**
-   * Return the final aggregated result based on aggregate buffer.
-   *
-   * @param buffer
-   * @return
-   */
-  override def evaluate(buffer: Row): T = {
-    buffer.getField(maxIndex).asInstanceOf[T]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    maxIndex = aggOffset
-  }
-}
-
-class ByteMaxAggregate extends MaxAggregate[Byte] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-
-}
-
-class ShortMaxAggregate extends MaxAggregate[Short] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-
-}
-
-class IntMaxAggregate extends MaxAggregate[Int] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-
-}
-
-class LongMaxAggregate extends MaxAggregate[Long] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-}
-
-class FloatMaxAggregate extends MaxAggregate[Float] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-
-}
-
-class DoubleMaxAggregate extends MaxAggregate[Double] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-
-}
-
-class BooleanMaxAggregate extends MaxAggregate[Boolean] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
-
-}
-
-class DecimalMaxAggregate extends Aggregate[BigDecimal] {
-
-  protected var minIndex: Int = _
-
-  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, null)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      partial.setField(minIndex, value)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
-      if (bufferValue != null) {
-        val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue
-        buffer.setField(minIndex, min)
-      } else {
-        buffer.setField(minIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    buffer.getField(minIndex).asInstanceOf[BigDecimal]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    minIndex = aggOffset
-  }
-}