You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:00 UTC
[31/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
deleted file mode 100644
index 273aa60..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-
-/**
- * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
- * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
- * -- In Map phase, use prepare() to transform aggregate field value into intermediate
- * aggregate value.
- * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
- * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
- * For associative decomposable aggregate functions, they support partial aggregate. To optimize
- * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
- * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
- * into aggregate buffer.
- *
- * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
- * field index in Row, so different aggregate functions could share the same Row as intermediate
- * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
- * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
- * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
- *
- * @tparam T Aggregated value type.
- */
-trait Aggregate[T] extends Serializable {
-
- /**
- * Transform the aggregate field value into intermediate aggregate data.
- *
- * @param value The value to insert into the intermediate aggregate row.
- * @param intermediate The intermediate aggregate row into which the value is inserted.
- */
- def prepare(value: Any, intermediate: Row): Unit
-
- /**
- * Initiate the intermediate aggregate value in Row.
- *
- * @param intermediate The intermediate aggregate row to initiate.
- */
- def initiate(intermediate: Row): Unit
-
- /**
- * Merge intermediate aggregate data into aggregate buffer.
- *
- * @param intermediate The intermediate aggregate row to merge.
- * @param buffer The aggregate buffer into which the intermedidate is merged.
- */
- def merge(intermediate: Row, buffer: Row): Unit
-
- /**
- * Calculate the final aggregated result based on aggregate buffer.
- *
- * @param buffer The aggregate buffer from which the final aggregate is computed.
- * @return The final result of the aggregate.
- */
- def evaluate(buffer: Row): T
-
- /**
- * Intermediate aggregate value types.
- *
- * @return The types of the intermediate fields of this aggregate.
- */
- def intermediateDataType: Array[TypeInformation[_]]
-
- /**
- * Set the aggregate data offset in Row.
- *
- * @param aggOffset The offset of this aggregate in the intermediate aggregate rows.
- */
- def setAggOffsetInRow(aggOffset: Int)
-
- /**
- * Whether aggregate function support partial aggregate.
- *
- * @return True if the aggregate supports partial aggregation, False otherwise.
- */
- def supportPartial: Boolean = false
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
deleted file mode 100644
index 4c473d4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-class AggregateAllTimeWindowFunction(
- groupReduceFunction: RichGroupReduceFunction[Row, Row],
- windowStartPos: Option[Int],
- windowEndPos: Option[Int])
- extends AggregateAllWindowFunction[TimeWindow](groupReduceFunction) {
-
- private var collector: TimeWindowPropertyCollector = _
-
- override def open(parameters: Configuration): Unit = {
- collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
- super.open(parameters)
- }
-
- override def apply(window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit = {
-
- // set collector and window
- collector.wrappedCollector = out
- collector.timeWindow = window
-
- // call wrapped reduce function with property collector
- super.apply(window, input, collector)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
deleted file mode 100644
index db5f477..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-class AggregateAllWindowFunction[W <: Window](
- groupReduceFunction: RichGroupReduceFunction[Row, Row])
- extends RichAllWindowFunction[Row, Row, W] {
-
- override def open(parameters: Configuration): Unit = {
- groupReduceFunction.open(parameters)
- }
-
- override def apply(window: W, input: Iterable[Row], out: Collector[Row]): Unit = {
- groupReduceFunction.reduce(input, out)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
deleted file mode 100644
index 0699bfa..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Preconditions
-
-class AggregateMapFunction[IN, OUT](
- private val aggregates: Array[Aggregate[_]],
- private val aggFields: Array[Int],
- private val groupingKeys: Array[Int],
- @transient private val returnType: TypeInformation[OUT])
- extends RichMapFunction[IN, OUT]
- with ResultTypeQueryable[OUT] {
-
- private var output: Row = _
-
- override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(aggFields)
- Preconditions.checkArgument(aggregates.size == aggFields.size)
- val partialRowLength = groupingKeys.length +
- aggregates.map(_.intermediateDataType.length).sum
- output = new Row(partialRowLength)
- }
-
- override def map(value: IN): OUT = {
-
- val input = value.asInstanceOf[Row]
- for (i <- 0 until aggregates.length) {
- val fieldValue = input.getField(aggFields(i))
- aggregates(i).prepare(fieldValue, output)
- }
- for (i <- 0 until groupingKeys.length) {
- output.setField(i, input.getField(groupingKeys(i)))
- }
- output.asInstanceOf[OUT]
- }
-
- override def getProducedType: TypeInformation[OUT] = {
- returnType
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
deleted file mode 100644
index b2cf07e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction}
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.{Collector, Preconditions}
-
-import scala.collection.JavaConversions._
-
-
-/**
- * It wraps the aggregate logic inside of
- * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
- * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- * index in output Row.
- */
-class AggregateReduceCombineFunction(
- private val aggregates: Array[Aggregate[_ <: Any]],
- private val groupKeysMapping: Array[(Int, Int)],
- private val aggregateMapping: Array[(Int, Int)],
- private val intermediateRowArity: Int,
- private val finalRowArity: Int)
- extends AggregateReduceGroupFunction(
- aggregates,
- groupKeysMapping,
- aggregateMapping,
- intermediateRowArity,
- finalRowArity)
- with CombineFunction[Row, Row] {
-
- /**
- * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
- *
- * @param records Sub-grouped intermediate aggregate Rows iterator.
- * @return Combined intermediate aggregate Row.
- *
- */
- override def combine(records: Iterable[Row]): Row = {
-
- // Initiate intermediate aggregate value.
- aggregates.foreach(_.initiate(aggregateBuffer))
-
- // Merge intermediate aggregate value to buffer.
- var last: Row = null
- records.foreach((record) => {
- aggregates.foreach(_.merge(record, aggregateBuffer))
- last = record
- })
-
- // Set group keys to aggregateBuffer.
- for (i <- groupKeysMapping.indices) {
- aggregateBuffer.setField(i, last.getField(i))
- }
-
- aggregateBuffer
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
deleted file mode 100644
index 6fe712b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.{Collector, Preconditions}
-
-import scala.collection.JavaConversions._
-
-/**
- * It wraps the aggregate logic inside of
- * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- * index in output Row.
- */
-class AggregateReduceGroupFunction(
- private val aggregates: Array[Aggregate[_ <: Any]],
- private val groupKeysMapping: Array[(Int, Int)],
- private val aggregateMapping: Array[(Int, Int)],
- private val intermediateRowArity: Int,
- private val finalRowArity: Int)
- extends RichGroupReduceFunction[Row, Row] {
-
- protected var aggregateBuffer: Row = _
- private var output: Row = _
-
- override def open(config: Configuration) {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
- aggregateBuffer = new Row(intermediateRowArity)
- output = new Row(finalRowArity)
- }
-
- /**
- * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
- * calculate aggregated values output by aggregate buffer, and set them into output
- * Row based on the mapping relation between intermediate aggregate data and output data.
- *
- * @param records Grouped intermediate aggregate Rows iterator.
- * @param out The collector to hand results to.
- *
- */
- override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
-
- // Initiate intermediate aggregate value.
- aggregates.foreach(_.initiate(aggregateBuffer))
-
- // Merge intermediate aggregate value to buffer.
- var last: Row = null
- records.foreach((record) => {
- aggregates.foreach(_.merge(record, aggregateBuffer))
- last = record
- })
-
- // Set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, last.getField(previous))
- }
-
- // Evaluate final aggregate value and set to output.
- aggregateMapping.foreach {
- case (after, previous) =>
- output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
- }
-
- out.collect(output)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
deleted file mode 100644
index ff8f6fb..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-class AggregateTimeWindowFunction(
- groupReduceFunction: RichGroupReduceFunction[Row, Row],
- windowStartPos: Option[Int],
- windowEndPos: Option[Int])
- extends AggregateWindowFunction[TimeWindow](groupReduceFunction) {
-
- private var collector: TimeWindowPropertyCollector = _
-
- override def open(parameters: Configuration): Unit = {
- collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
- super.open(parameters)
- }
-
- override def apply(
- key: Tuple,
- window: TimeWindow,
- input: Iterable[Row],
- out: Collector[Row]): Unit = {
-
- // set collector and window
- collector.wrappedCollector = out
- collector.timeWindow = window
-
- // call wrapped reduce function with property collector
- super.apply(key, window, input, collector)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
deleted file mode 100644
index a181068..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ /dev/null
@@ -1,593 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.util
-
-import org.apache.calcite.rel.`type`._
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
-import org.apache.calcite.sql.fun._
-import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.expressions.{WindowEnd, WindowStart}
-import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-object AggregateUtil {
-
- type CalcitePair[T, R] = org.apache.calcite.util.Pair[T, R]
- type JavaList[T] = java.util.List[T]
-
- /**
- * Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
- * The function returns intermediate aggregate values of all aggregate function which are
- * organized by the following format:
- *
- * {{{
- * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5
- * | |
- * v v
- * +---------+---------+--------+--------+--------+--------+
- * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |
- * +---------+---------+--------+--------+--------+--------+
- * ^
- * |
- * sum(y) aggOffsetInRow = 4
- * }}}
- *
- */
- private[flink] def createPrepareMapFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- groupings: Array[Int],
- inputType: RelDataType): MapFunction[Any, Row] = {
-
- val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
- namedAggregates.map(_.getKey),
- inputType,
- groupings.length)
-
- val mapReturnType: RowTypeInfo =
- createAggregateBufferDataType(groupings, aggregates, inputType)
-
- val mapFunction = new AggregateMapFunction[Row, Row](
- aggregates,
- aggFieldIndexes,
- groupings,
- mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
- mapFunction
- }
-
- /**
- * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute aggregates.
- * If all aggregates support partial aggregation, the
- * [[org.apache.flink.api.common.functions.GroupReduceFunction]] implements
- * [[org.apache.flink.api.common.functions.CombineFunction]] as well.
- *
- */
- private[flink] def createAggregateGroupReduceFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int]): RichGroupReduceFunction[Row, Row] = {
-
- val aggregates = transformToAggregateFunctions(
- namedAggregates.map(_.getKey),
- inputType,
- groupings.length)._2
-
- val (groupingOffsetMapping, aggOffsetMapping) =
- getGroupingOffsetAndAggOffsetMapping(
- namedAggregates,
- inputType,
- outputType,
- groupings)
-
- val allPartialAggregate: Boolean = aggregates.forall(_.supportPartial)
-
- val intermediateRowArity = groupings.length +
- aggregates.map(_.intermediateDataType.length).sum
-
- val groupReduceFunction =
- if (allPartialAggregate) {
- new AggregateReduceCombineFunction(
- aggregates,
- groupingOffsetMapping,
- aggOffsetMapping,
- intermediateRowArity,
- outputType.getFieldCount)
- }
- else {
- new AggregateReduceGroupFunction(
- aggregates,
- groupingOffsetMapping,
- aggOffsetMapping,
- intermediateRowArity,
- outputType.getFieldCount)
- }
- groupReduceFunction
- }
-
- /**
- * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
- * aggregation.
- *
- */
- private[flink] def createIncrementalAggregateReduceFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int]): IncrementalAggregateReduceFunction = {
-
- val aggregates = transformToAggregateFunctions(
- namedAggregates.map(_.getKey),inputType,groupings.length)._2
-
- val groupingOffsetMapping =
- getGroupingOffsetAndAggOffsetMapping(
- namedAggregates,
- inputType,
- outputType,
- groupings)._1
-
- val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
- val reduceFunction = new IncrementalAggregateReduceFunction(
- aggregates,
- groupingOffsetMapping,
- intermediateRowArity)
- reduceFunction
- }
-
- /**
- * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
- */
- private[flink] def createAllWindowAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty])
- : AllWindowFunction[Row, Row, DataStreamWindow] = {
-
- val aggFunction =
- createAggregateGroupReduceFunction(
- namedAggregates,
- inputType,
- outputType,
- groupings)
-
- if (isTimeWindow(window)) {
- val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
- new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
- .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
- } else {
- new AggregateAllWindowFunction(aggFunction)
- }
- }
-
- /**
- * Create a [[WindowFunction]] to compute partitioned group window aggregates.
- *
- */
- private[flink] def createWindowAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty])
- : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
-
- val aggFunction =
- createAggregateGroupReduceFunction(
- namedAggregates,
- inputType,
- outputType,
- groupings)
-
- if (isTimeWindow(window)) {
- val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
- new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
- .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
- } else {
- new AggregateWindowFunction(aggFunction)
- }
- }
-
- /**
- * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
- * window aggreagtes.
- */
- private[flink] def createAllWindowIncrementalAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
-
- val aggregates = transformToAggregateFunctions(
- namedAggregates.map(_.getKey),inputType,groupings.length)._2
-
- val (groupingOffsetMapping, aggOffsetMapping) =
- getGroupingOffsetAndAggOffsetMapping(
- namedAggregates,
- inputType,
- outputType,
- groupings)
-
- val finalRowArity = outputType.getFieldCount
-
- if (isTimeWindow(window)) {
- val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
- new IncrementalAggregateAllTimeWindowFunction(
- aggregates,
- groupingOffsetMapping,
- aggOffsetMapping,
- finalRowArity,
- startPos,
- endPos)
- .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
- } else {
- new IncrementalAggregateAllWindowFunction(
- aggregates,
- groupingOffsetMapping,
- aggOffsetMapping,
- finalRowArity)
- }
- }
-
- /**
- * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
- */
- private[flink] def createWindowIncrementalAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
-
- val aggregates = transformToAggregateFunctions(
- namedAggregates.map(_.getKey),inputType,groupings.length)._2
-
- val (groupingOffsetMapping, aggOffsetMapping) =
- getGroupingOffsetAndAggOffsetMapping(
- namedAggregates,
- inputType,
- outputType,
- groupings)
-
- val finalRowArity = outputType.getFieldCount
-
- if (isTimeWindow(window)) {
- val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
- new IncrementalAggregateTimeWindowFunction(
- aggregates,
- groupingOffsetMapping,
- aggOffsetMapping,
- finalRowArity,
- startPos,
- endPos)
- .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
- } else {
- new IncrementalAggregateWindowFunction(
- aggregates,
- groupingOffsetMapping,
- aggOffsetMapping,
- finalRowArity)
- }
- }
-
- /**
- * Return true if all aggregates can be partially computed. False otherwise.
- */
- private[flink] def doAllSupportPartialAggregation(
- aggregateCalls: Seq[AggregateCall],
- inputType: RelDataType,
- groupKeysCount: Int): Boolean = {
- transformToAggregateFunctions(
- aggregateCalls,
- inputType,
- groupKeysCount)._2.forall(_.supportPartial)
- }
-
- /**
- * @return groupingOffsetMapping (mapping relation between field index of intermediate
- * aggregate Row and output Row.)
- * and aggOffsetMapping (the mapping relation between aggregate function index in list
- * and its corresponding field index in output Row.)
- */
- private def getGroupingOffsetAndAggOffsetMapping(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
-
- // the mapping relation between field index of intermediate aggregate Row and output Row.
- val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
-
- // the mapping relation between aggregate function index in list and its corresponding
- // field index in output Row.
- val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
- if (groupingOffsetMapping.length != groupings.length ||
- aggOffsetMapping.length != namedAggregates.length) {
- throw new TableException(
- "Could not find output field in input data type " +
- "or aggregate functions.")
- }
- (groupingOffsetMapping, aggOffsetMapping)
- }
-
- private def isTimeWindow(window: LogicalWindow) = {
- window match {
- case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
- case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
- case ProcessingTimeSessionGroupWindow(_, _) => true
- case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
- case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
- case EventTimeSessionGroupWindow(_, _, _) => true
- }
- }
-
- private def computeWindowStartEndPropertyPos(
- properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = {
-
- val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
- (p, x) => p match {
- case NamedWindowProperty(name, prop) =>
- prop match {
- case WindowStart(_) if x._1.isDefined =>
- throw new TableException("Duplicate WindowStart property encountered. This is a bug.")
- case WindowStart(_) =>
- (Some(x._3), x._2, x._3 - 1)
- case WindowEnd(_) if x._2.isDefined =>
- throw new TableException("Duplicate WindowEnd property encountered. This is a bug.")
- case WindowEnd(_) =>
- (x._1, Some(x._3), x._3 - 1)
- }
- }
- }
- (propPos._1, propPos._2)
- }
-
- private def transformToAggregateFunctions(
- aggregateCalls: Seq[AggregateCall],
- inputType: RelDataType,
- groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
-
- // store the aggregate fields of each aggregate function, by the same order of aggregates.
- val aggFieldIndexes = new Array[Int](aggregateCalls.size)
- val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
-
- // set the start offset of aggregate buffer value to group keys' length,
- // as all the group keys would be moved to the start fields of intermediate
- // aggregate data.
- var aggOffset = groupKeysCount
-
- // create aggregate function instances by function type and aggregate field data type.
- aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
- val argList: util.List[Integer] = aggregateCall.getArgList
- if (argList.isEmpty) {
- if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
- aggFieldIndexes(index) = 0
- } else {
- throw new TableException("Aggregate fields should not be empty.")
- }
- } else {
- if (argList.size() > 1) {
- throw new TableException("Currently, do not support aggregate on multi fields.")
- }
- aggFieldIndexes(index) = argList.get(0)
- }
- val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
- aggregateCall.getAggregation match {
- case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
- aggregates(index) = sqlTypeName match {
- case TINYINT =>
- new ByteSumAggregate
- case SMALLINT =>
- new ShortSumAggregate
- case INTEGER =>
- new IntSumAggregate
- case BIGINT =>
- new LongSumAggregate
- case FLOAT =>
- new FloatSumAggregate
- case DOUBLE =>
- new DoubleSumAggregate
- case DECIMAL =>
- new DecimalSumAggregate
- case sqlType: SqlTypeName =>
- throw new TableException("Sum aggregate does no support type:" + sqlType)
- }
- }
- case _: SqlAvgAggFunction => {
- aggregates(index) = sqlTypeName match {
- case TINYINT =>
- new ByteAvgAggregate
- case SMALLINT =>
- new ShortAvgAggregate
- case INTEGER =>
- new IntAvgAggregate
- case BIGINT =>
- new LongAvgAggregate
- case FLOAT =>
- new FloatAvgAggregate
- case DOUBLE =>
- new DoubleAvgAggregate
- case DECIMAL =>
- new DecimalAvgAggregate
- case sqlType: SqlTypeName =>
- throw new TableException("Avg aggregate does no support type:" + sqlType)
- }
- }
- case sqlMinMaxFunction: SqlMinMaxAggFunction => {
- aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
- sqlTypeName match {
- case TINYINT =>
- new ByteMinAggregate
- case SMALLINT =>
- new ShortMinAggregate
- case INTEGER =>
- new IntMinAggregate
- case BIGINT =>
- new LongMinAggregate
- case FLOAT =>
- new FloatMinAggregate
- case DOUBLE =>
- new DoubleMinAggregate
- case DECIMAL =>
- new DecimalMinAggregate
- case BOOLEAN =>
- new BooleanMinAggregate
- case sqlType: SqlTypeName =>
- throw new TableException("Min aggregate does no support type:" + sqlType)
- }
- } else {
- sqlTypeName match {
- case TINYINT =>
- new ByteMaxAggregate
- case SMALLINT =>
- new ShortMaxAggregate
- case INTEGER =>
- new IntMaxAggregate
- case BIGINT =>
- new LongMaxAggregate
- case FLOAT =>
- new FloatMaxAggregate
- case DOUBLE =>
- new DoubleMaxAggregate
- case DECIMAL =>
- new DecimalMaxAggregate
- case BOOLEAN =>
- new BooleanMaxAggregate
- case sqlType: SqlTypeName =>
- throw new TableException("Max aggregate does no support type:" + sqlType)
- }
- }
- }
- case _: SqlCountAggFunction =>
- aggregates(index) = new CountAggregate
- case unSupported: SqlAggFunction =>
- throw new TableException("unsupported Function: " + unSupported.getName)
- }
- setAggregateDataOffset(index)
- }
-
- // set the aggregate intermediate data start index in Row, and update current value.
- def setAggregateDataOffset(index: Int): Unit = {
- aggregates(index).setAggOffsetInRow(aggOffset)
- aggOffset += aggregates(index).intermediateDataType.length
- }
-
- (aggFieldIndexes, aggregates)
- }
-
- private def createAggregateBufferDataType(
- groupings: Array[Int],
- aggregates: Array[Aggregate[_]],
- inputType: RelDataType): RowTypeInfo = {
-
- // get the field data types of group keys.
- val groupingTypes: Seq[TypeInformation[_]] = groupings
- .map(inputType.getFieldList.get(_).getType)
- .map(FlinkTypeFactory.toTypeInfo)
-
- val aggPartialNameSuffix = "agg_buffer_"
- val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
-
- // get all field data types of all intermediate aggregates
- val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
-
- // concat group key types and aggregation types
- val allFieldTypes = groupingTypes ++: aggTypes
- val partialType = new RowTypeInfo(allFieldTypes: _*)
- partialType
- }
-
- // Find the mapping between the index of aggregate list and aggregated value index in output Row.
- private def getAggregateMapping(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- outputType: RelDataType): Array[(Int, Int)] = {
-
- // the mapping relation between aggregate function index in list and its corresponding
- // field index in output Row.
- var aggOffsetMapping = ArrayBuffer[(Int, Int)]()
-
- outputType.getFieldList.zipWithIndex.foreach{
- case (outputFieldType, outputIndex) =>
- namedAggregates.zipWithIndex.foreach {
- case (namedAggCall, aggregateIndex) =>
- if (namedAggCall.getValue.equals(outputFieldType.getName) &&
- namedAggCall.getKey.getType.equals(outputFieldType.getType)) {
- aggOffsetMapping += ((outputIndex, aggregateIndex))
- }
- }
- }
-
- aggOffsetMapping.toArray
- }
-
- // Find the mapping between the index of group key in intermediate aggregate Row and its index
- // in output Row.
- private def getGroupKeysMapping(
- inputDatType: RelDataType,
- outputType: RelDataType,
- groupKeys: Array[Int]): Array[(Int, Int)] = {
-
- // the mapping relation between field index of intermediate aggregate Row and output Row.
- var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()
-
- outputType.getFieldList.zipWithIndex.foreach {
- case (outputFieldType, outputIndex) =>
- inputDatType.getFieldList.zipWithIndex.foreach {
- // find the field index in input data type.
- case (inputFieldType, inputIndex) =>
- if (outputFieldType.getName.equals(inputFieldType.getName) &&
- outputFieldType.getType.equals(inputFieldType.getType)) {
- // as aggregated field in output data type would not have a matched field in
- // input data, so if inputIndex is not -1, it must be a group key. Then we can
- // find the field index in buffer data by the group keys index mapping between
- // input data and buffer data.
- for (i <- groupKeys.indices) {
- if (inputIndex == groupKeys(i)) {
- groupingOffsetMapping += ((outputIndex, i))
- }
- }
- }
- }
- }
-
- groupingOffsetMapping.toArray
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
deleted file mode 100644
index 4e77549..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
-
-class AggregateWindowFunction[W <: Window](groupReduceFunction: RichGroupReduceFunction[Row, Row])
- extends RichWindowFunction[Row, Row, Tuple, W] {
-
- override def open(parameters: Configuration): Unit = {
- groupReduceFunction.open(parameters)
- }
-
- override def apply(
- key: Tuple,
- window: W,
- input: Iterable[Row],
- out: Collector[Row]): Unit = {
-
- groupReduceFunction.reduce(input, out)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
deleted file mode 100644
index 998ae62..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import com.google.common.math.LongMath
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-import java.math.BigDecimal
-import java.math.BigInteger
-
-abstract class AvgAggregate[T] extends Aggregate[T] {
- protected var partialSumIndex: Int = _
- protected var partialCountIndex: Int = _
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggOffset: Int): Unit = {
- partialSumIndex = aggOffset
- partialCountIndex = aggOffset + 1
- }
-}
-
-abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
-
- override def initiate(partial: Row): Unit = {
- partial.setField(partialSumIndex, 0L)
- partial.setField(partialCountIndex, 0L)
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- partial.setField(partialSumIndex, 0L)
- partial.setField(partialCountIndex, 0L)
- } else {
- doPrepare(value, partial)
- }
- }
-
- override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long]
- val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum))
- buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
- }
-
- override def evaluate(buffer : Row): T = {
- doEvaluate(buffer).asInstanceOf[T]
- }
-
- override def intermediateDataType = Array(
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO)
-
- def doPrepare(value: Any, partial: Row): Unit
-
- def doEvaluate(buffer: Row): Any
-}
-
-class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
- override def doPrepare(value: Any, partial: Row): Unit = {
- val input = value.asInstanceOf[Byte]
- partial.setField(partialSumIndex, input.toLong)
- partial.setField(partialCountIndex, 1L)
- }
-
- override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- if (bufferCount == 0L) {
- null
- } else {
- (bufferSum / bufferCount).toByte
- }
- }
-}
-
-class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
-
- override def doPrepare(value: Any, partial: Row): Unit = {
- val input = value.asInstanceOf[Short]
- partial.setField(partialSumIndex, input.toLong)
- partial.setField(partialCountIndex, 1L)
- }
-
- override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- if (bufferCount == 0L) {
- null
- } else {
- (bufferSum / bufferCount).toShort
- }
- }
-}
-
-class IntAvgAggregate extends IntegralAvgAggregate[Int] {
-
- override def doPrepare(value: Any, partial: Row): Unit = {
- val input = value.asInstanceOf[Int]
- partial.setField(partialSumIndex, input.toLong)
- partial.setField(partialCountIndex, 1L)
- }
-
- override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- if (bufferCount == 0L) {
- null
- } else {
- (bufferSum / bufferCount).toInt
- }
- }
-}
-
-class LongAvgAggregate extends IntegralAvgAggregate[Long] {
-
- override def intermediateDataType = Array(
- BasicTypeInfo.BIG_INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO)
-
- override def initiate(partial: Row): Unit = {
- partial.setField(partialSumIndex, BigInteger.ZERO)
- partial.setField(partialCountIndex, 0L)
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- partial.setField(partialSumIndex, BigInteger.ZERO)
- partial.setField(partialCountIndex, 0L)
- } else {
- doPrepare(value, partial)
- }
- }
-
- override def doPrepare(value: Any, partial: Row): Unit = {
- val input = value.asInstanceOf[Long]
- partial.setField(partialSumIndex, BigInteger.valueOf(input))
- partial.setField(partialCountIndex, 1L)
- }
-
- override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger]
- val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- buffer.setField(partialSumIndex, partialSum.add(bufferSum))
- buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
- }
-
- override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- if (bufferCount == 0L) {
- null
- } else {
- bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
- }
- }
-}
-
-abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
-
- override def initiate(partial: Row): Unit = {
- partial.setField(partialSumIndex, 0D)
- partial.setField(partialCountIndex, 0L)
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- partial.setField(partialSumIndex, 0D)
- partial.setField(partialCountIndex, 0L)
- } else {
- doPrepare(value, partial)
- }
- }
-
- override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double]
- val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
-
- buffer.setField(partialSumIndex, partialSum + bufferSum)
- buffer.setField(partialCountIndex, partialCount + bufferCount)
- }
-
- override def evaluate(buffer : Row): T = {
- doEvaluate(buffer).asInstanceOf[T]
- }
-
- override def intermediateDataType = Array(
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO)
-
- def doPrepare(value: Any, partial: Row): Unit
-
- def doEvaluate(buffer: Row): Any
-}
-
-class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
-
- override def doPrepare(value: Any, partial: Row): Unit = {
- val input = value.asInstanceOf[Float]
- partial.setField(partialSumIndex, input.toDouble)
- partial.setField(partialCountIndex, 1L)
- }
-
-
- override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- if (bufferCount == 0L) {
- null
- } else {
- (bufferSum / bufferCount).toFloat
- }
- }
-}
-
-class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
-
- override def doPrepare(value: Any, partial: Row): Unit = {
- val input = value.asInstanceOf[Double]
- partial.setField(partialSumIndex, input)
- partial.setField(partialCountIndex, 1L)
- }
-
- override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- if (bufferCount == 0L) {
- null
- } else {
- (bufferSum / bufferCount)
- }
- }
-}
-
-class DecimalAvgAggregate extends AvgAggregate[BigDecimal] {
-
- override def intermediateDataType = Array(
- BasicTypeInfo.BIG_DEC_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO)
-
- override def initiate(partial: Row): Unit = {
- partial.setField(partialSumIndex, BigDecimal.ZERO)
- partial.setField(partialCountIndex, 0L)
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- initiate(partial)
- } else {
- val input = value.asInstanceOf[BigDecimal]
- partial.setField(partialSumIndex, input)
- partial.setField(partialCountIndex, 1L)
- }
- }
-
- override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal]
- val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- buffer.setField(partialSumIndex, partialSum.add(bufferSum))
- buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
- }
-
- override def evaluate(buffer: Row): BigDecimal = {
- val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
- if (bufferCount != 0) {
- val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
- bufferSum.divide(BigDecimal.valueOf(bufferCount))
- } else {
- null.asInstanceOf[BigDecimal]
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
deleted file mode 100644
index 4d6d20b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.types.Row
-
-class CountAggregate extends Aggregate[Long] {
- private var countIndex: Int = _
-
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(countIndex, 0L)
- }
-
- override def merge(intermediate: Row, buffer: Row): Unit = {
- val partialCount = intermediate.getField(countIndex).asInstanceOf[Long]
- val bufferCount = buffer.getField(countIndex).asInstanceOf[Long]
- buffer.setField(countIndex, partialCount + bufferCount)
- }
-
- override def evaluate(buffer: Row): Long = {
- buffer.getField(countIndex).asInstanceOf[Long]
- }
-
- override def prepare(value: Any, intermediate: Row): Unit = {
- if (value == null) {
- intermediate.setField(countIndex, 0L)
- } else {
- intermediate.setField(countIndex, 1L)
- }
- }
-
- override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggIndex: Int): Unit = {
- countIndex = aggIndex
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
deleted file mode 100644
index 48e2313..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
-import org.apache.flink.util.Collector
-/**
- *
- * Computes the final aggregate value from incrementally computed aggreagtes.
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- * index in output Row.
- * @param finalRowArity The arity of the final output row.
- */
-class IncrementalAggregateAllTimeWindowFunction(
- private val aggregates: Array[Aggregate[_ <: Any]],
- private val groupKeysMapping: Array[(Int, Int)],
- private val aggregateMapping: Array[(Int, Int)],
- private val finalRowArity: Int,
- private val windowStartPos: Option[Int],
- private val windowEndPos: Option[Int])
- extends IncrementalAggregateAllWindowFunction[TimeWindow](
- aggregates,
- groupKeysMapping,
- aggregateMapping,
- finalRowArity) {
-
- private var collector: TimeWindowPropertyCollector = _
-
- override def open(parameters: Configuration): Unit = {
- collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
- super.open(parameters)
- }
-
- override def apply(
- window: TimeWindow,
- records: Iterable[Row],
- out: Collector[Row]): Unit = {
-
- // set collector and window
- collector.wrappedCollector = out
- collector.timeWindow = window
-
- super.apply(window, records, collector)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
deleted file mode 100644
index 1a85dca..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
- * Computes the final aggregate value from incrementally computed aggreagtes.
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- * index in output Row.
- * @param finalRowArity The arity of the final output row.
- */
-class IncrementalAggregateAllWindowFunction[W <: Window](
- private val aggregates: Array[Aggregate[_ <: Any]],
- private val groupKeysMapping: Array[(Int, Int)],
- private val aggregateMapping: Array[(Int, Int)],
- private val finalRowArity: Int)
- extends RichAllWindowFunction[Row, Row, W] {
-
- private var output: Row = _
-
- override def open(parameters: Configuration): Unit = {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
- output = new Row(finalRowArity)
- }
-
- /**
- * Calculate aggregated values output by aggregate buffer, and set them into output
- * Row based on the mapping relation between intermediate aggregate data and output data.
- */
- override def apply(
- window: W,
- records: Iterable[Row],
- out: Collector[Row]): Unit = {
-
- val iterator = records.iterator
-
- if (iterator.hasNext) {
- val record = iterator.next()
- // Set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, record.getField(previous))
- }
- // Evaluate final aggregate value and set to output.
- aggregateMapping.foreach {
- case (after, previous) =>
- output.setField(after, aggregates(previous).evaluate(record))
- }
- out.collect(output)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
deleted file mode 100644
index 5c36821..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
-
-/**
- * Incrementally computes group window aggregates.
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- */
-class IncrementalAggregateReduceFunction(
- private val aggregates: Array[Aggregate[_]],
- private val groupKeysMapping: Array[(Int, Int)],
- private val intermediateRowArity: Int)
- extends ReduceFunction[Row] {
-
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
-
- /**
- * For Incremental intermediate aggregate Rows, merge value1 and value2
- * into aggregate buffer, return aggregate buffer.
- *
- * @param value1 The first value to combined.
- * @param value2 The second value to combined.
- * @return accumulatorRow A resulting row that combines two input values.
- *
- */
- override def reduce(value1: Row, value2: Row): Row = {
-
- // TODO: once FLINK-5105 is solved, we can avoid creating a new row for each invocation
- // and directly merge value1 and value2.
- val accumulatorRow = new Row(intermediateRowArity)
-
- // copy all fields of value1 into accumulatorRow
- (0 until intermediateRowArity)
- .foreach(i => accumulatorRow.setField(i, value1.getField(i)))
- // merge value2 to accumulatorRow
- aggregates.foreach(_.merge(value2, accumulatorRow))
-
- accumulatorRow
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
deleted file mode 100644
index 2513383..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-/**
- * Computes the final aggregate value from incrementally computed aggreagtes.
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- * index in output Row.
- * @param finalRowArity The arity of the final output row.
- */
-class IncrementalAggregateTimeWindowFunction(
- private val aggregates: Array[Aggregate[_ <: Any]],
- private val groupKeysMapping: Array[(Int, Int)],
- private val aggregateMapping: Array[(Int, Int)],
- private val finalRowArity: Int,
- private val windowStartPos: Option[Int],
- private val windowEndPos: Option[Int])
- extends IncrementalAggregateWindowFunction[TimeWindow](
- aggregates,
- groupKeysMapping,
- aggregateMapping, finalRowArity) {
-
- private var collector: TimeWindowPropertyCollector = _
-
- override def open(parameters: Configuration): Unit = {
- collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
- super.open(parameters)
- }
-
- override def apply(
- key: Tuple,
- window: TimeWindow,
- records: Iterable[Row],
- out: Collector[Row]): Unit = {
-
- // set collector and window
- collector.wrappedCollector = out
- collector.timeWindow = window
-
- super.apply(key, window, records, collector)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
deleted file mode 100644
index d0d71ee..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.lang.Iterable
-
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
-import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.{Collector, Preconditions}
-
-/**
- * Computes the final aggregate value from incrementally computed aggreagtes.
- *
- * @param aggregates The aggregate functions.
- * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping The index mapping between aggregate function list and aggregated value
- * index in output Row.
- * @param finalRowArity The arity of the final output row.
- */
-class IncrementalAggregateWindowFunction[W <: Window](
- private val aggregates: Array[Aggregate[_ <: Any]],
- private val groupKeysMapping: Array[(Int, Int)],
- private val aggregateMapping: Array[(Int, Int)],
- private val finalRowArity: Int)
- extends RichWindowFunction[Row, Row, Tuple, W] {
-
- private var output: Row = _
-
- override def open(parameters: Configuration): Unit = {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(groupKeysMapping)
- output = new Row(finalRowArity)
- }
-
- /**
- * Calculate aggregated values output by aggregate buffer, and set them into output
- * Row based on the mapping relation between intermediate aggregate data and output data.
- */
- override def apply(
- key: Tuple,
- window: W,
- records: Iterable[Row],
- out: Collector[Row]): Unit = {
-
- val iterator = records.iterator
-
- if (iterator.hasNext) {
- val record = iterator.next()
- // Set group keys value to final output.
- groupKeysMapping.foreach {
- case (after, previous) =>
- output.setField(after, record.getField(previous))
- }
- // Evaluate final aggregate value and set to output.
- aggregateMapping.foreach {
- case (after, previous) =>
- output.setField(after, aggregates(previous).evaluate(record))
- }
- out.collect(output)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
deleted file mode 100644
index 2cb3dc7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
-
- protected var maxIndex = -1
-
- /**
- * Initiate the intermediate aggregate value in Row.
- *
- * @param intermediate The intermediate aggregate row to initiate.
- */
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, null)
- }
-
- /**
- * Accessed in MapFunction, prepare the input of partial aggregate.
- *
- * @param value
- * @param intermediate
- */
- override def prepare(value: Any, intermediate: Row): Unit = {
- if (value == null) {
- initiate(intermediate)
- } else {
- intermediate.setField(maxIndex, value)
- }
- }
-
- /**
- * Accessed in CombineFunction and GroupReduceFunction, merge partial
- * aggregate result into aggregate buffer.
- *
- * @param intermediate
- * @param buffer
- */
- override def merge(intermediate: Row, buffer: Row): Unit = {
- val partialValue = intermediate.getField(maxIndex).asInstanceOf[T]
- if (partialValue != null) {
- val bufferValue = buffer.getField(maxIndex).asInstanceOf[T]
- if (bufferValue != null) {
- val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
- buffer.setField(maxIndex, max)
- } else {
- buffer.setField(maxIndex, partialValue)
- }
- }
- }
-
- /**
- * Return the final aggregated result based on aggregate buffer.
- *
- * @param buffer
- * @return
- */
- override def evaluate(buffer: Row): T = {
- buffer.getField(maxIndex).asInstanceOf[T]
- }
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggOffset: Int): Unit = {
- maxIndex = aggOffset
- }
-}
-
-class ByteMaxAggregate extends MaxAggregate[Byte] {
-
- override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-
-}
-
-class ShortMaxAggregate extends MaxAggregate[Short] {
-
- override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-
-}
-
-class IntMaxAggregate extends MaxAggregate[Int] {
-
- override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-
-}
-
-class LongMaxAggregate extends MaxAggregate[Long] {
-
- override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-}
-
-class FloatMaxAggregate extends MaxAggregate[Float] {
-
- override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-
-}
-
-class DoubleMaxAggregate extends MaxAggregate[Double] {
-
- override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-
-}
-
-class BooleanMaxAggregate extends MaxAggregate[Boolean] {
-
- override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
-
-}
-
-class DecimalMaxAggregate extends Aggregate[BigDecimal] {
-
- protected var minIndex: Int = _
-
- override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, null)
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- initiate(partial)
- } else {
- partial.setField(minIndex, value)
- }
- }
-
- override def merge(partial: Row, buffer: Row): Unit = {
- val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
- if (partialValue != null) {
- val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
- if (bufferValue != null) {
- val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue
- buffer.setField(minIndex, min)
- } else {
- buffer.setField(minIndex, partialValue)
- }
- }
- }
-
- override def evaluate(buffer: Row): BigDecimal = {
- buffer.getField(minIndex).asInstanceOf[BigDecimal]
- }
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggOffset: Int): Unit = {
- minIndex = aggOffset
- }
-}