You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/24 09:09:24 UTC
flink git commit: [FLINK-5767] [table] Add interface for user-defined
aggregate functions and built-in aggregate functions.
Repository: flink
Updated Branches:
refs/heads/master 417597fbf -> dd8ef550c
[FLINK-5767] [table] Add interface for user-defined aggregate functions and built-in aggregate functions.
This closes #3354.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd8ef550
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd8ef550
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd8ef550
Branch: refs/heads/master
Commit: dd8ef550cf4c590c5a84ba313c57e202d4df94f4
Parents: 417597f
Author: Shaoxuan Wang <ws...@gmail.com>
Authored: Sat Feb 18 17:18:38 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Feb 24 10:07:44 2017 +0100
----------------------------------------------------------------------
.../table/functions/AggregateFunction.scala | 76 +++++
.../functions/aggfunctions/AvgAggFunction.scala | 289 +++++++++++++++++++
.../aggfunctions/CountAggFunction.scala | 57 ++++
.../functions/aggfunctions/MaxAggFunction.scala | 126 ++++++++
.../functions/aggfunctions/MinAggFunction.scala | 126 ++++++++
.../functions/aggfunctions/SumAggFunction.scala | 153 ++++++++++
.../utils/UserDefinedFunctionUtils.scala | 13 +
.../aggfunctions/AggFunctionTestBase.scala | 98 +++++++
.../aggfunctions/AvgFunctionTest.scala | 186 ++++++++++++
.../aggfunctions/CountAggFunctionTest.scala | 36 +++
.../aggfunctions/MaxAggFunctionTest.scala | 188 ++++++++++++
.../aggfunctions/MinAggFunctionTest.scala | 188 ++++++++++++
.../aggfunctions/SumAggFunctionTest.scala | 147 ++++++++++
13 files changed, 1683 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
new file mode 100644
index 0000000..e15a8c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.util.{List => JList}
+
+/**
+ * Base class for User-Defined Aggregates.
+ *
+ * @tparam T the type of the aggregation result
+ */
+trait AggregateFunction[T] extends UserDefinedFunction {
+ /**
+ * Create and init the Accumulator for this [[AggregateFunction]].
+ *
+ * @return the accumulator with the initial value
+ */
+ def createAccumulator(): Accumulator
+
+ /**
+ * Called every time when an aggregation result should be materialized.
+ * The returned value could be either an early and incomplete result
+ * (periodically emitted as data arrive) or the final result of the
+ * aggregation.
+ *
+ * @param accumulator the accumulator which contains the current
+ * aggregated results
+ * @return the aggregation result
+ */
+ def getValue(accumulator: Accumulator): T
+
+ /**
+ * Process the input values and update the provided accumulator instance.
+ *
+ * @param accumulator the accumulator which contains the current
+ * aggregated results
+ * @param input the input value (usually obtained from a new arrived data)
+ */
+ def accumulate(accumulator: Accumulator, input: Any): Unit
+
+ /**
+ * Merge a list of accumulator instances into one accumulator instance.
+ *
+ * @param accumulators the [[java.util.List]] of accumulators
+ * that will be merged
+ * @return the resulting accumulator
+ */
+ def merge(accumulators: JList[Accumulator]): Accumulator
+}
+
+/**
+ * Base class for aggregate Accumulator. The accumulator is used to keep the
+ * aggregated values which are needed to compute an aggregation result.
+ * The state of the function must be put into the accumulator.
+ *
+ * TODO: We have the plan to have the accumulator and return types of
+ * functions dynamically provided by the users. This needs the refactoring
+ * of the AggregateFunction interface with the code generation. We will remove
+ * the [[Accumulator]] once codeGen for UDAGG is completed (FLINK-5813).
+ */
+trait Accumulator
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
new file mode 100644
index 0000000..f4c0b7b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.{BigDecimal, BigInteger}
+import java.util.{List => JList}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+ * Base class for built-in Integral Avg aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class IntegralAvgAggFunction[T] extends AggregateFunction[T] {
+
+ /** The initial accumulator for Integral Avg aggregate function */
+ class IntegralAvgAccumulator extends JTuple2[Long, Long] with Accumulator {
+ f0 = 0 //sum
+ f1 = 0 //count
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new IntegralAvgAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[Number].longValue()
+ val accum = accumulator.asInstanceOf[IntegralAvgAccumulator]
+ accum.f0 += v
+ accum.f1 += 1
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): T = {
+ val accum = accumulator.asInstanceOf[IntegralAvgAccumulator]
+ if (accum.f1 == 0) {
+ null.asInstanceOf[T]
+ } else {
+ resultTypeConvert(accum.f0 / accum.f1)
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0).asInstanceOf[IntegralAvgAccumulator]
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[IntegralAvgAccumulator]
+ ret.f1 += a.f1
+ ret.f0 += a.f0
+ i += 1
+ }
+ ret
+ }
+
+ /**
+ * Convert the intermediate result to the expected aggregation result type
+ *
+ * @param value the intermediate result. We use a Long container to save
+ * the intermediate result to avoid the overflow by sum operation.
+ * @return the result value with the expected aggregation result type
+ */
+ def resultTypeConvert(value: Long): T
+}
+
+/**
+ * Built-in Byte Avg aggregate function
+ */
+class ByteAvgAggFunction extends IntegralAvgAggFunction[Byte] {
+ override def resultTypeConvert(value: Long): Byte = value.toByte
+}
+
+/**
+ * Built-in Short Avg aggregate function
+ */
+class ShortAvgAggFunction extends IntegralAvgAggFunction[Short] {
+ override def resultTypeConvert(value: Long): Short = value.toShort
+}
+
+/**
+ * Built-in Int Avg aggregate function
+ */
+class IntAvgAggFunction extends IntegralAvgAggFunction[Int] {
+ override def resultTypeConvert(value: Long): Int = value.toInt
+}
+
+/**
+ * Base Class for Built-in Big Integral Avg aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] {
+
+ /** The initial accumulator for Big Integral Avg aggregate function */
+ class BigIntegralAvgAccumulator
+ extends JTuple2[BigInteger, Long] with Accumulator {
+ f0 = BigInteger.ZERO //sum
+ f1 = 0 //count
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new BigIntegralAvgAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[Long]
+ val a = accumulator.asInstanceOf[BigIntegralAvgAccumulator]
+ a.f0 = a.f0.add(BigInteger.valueOf(v))
+ a.f1 += 1
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): T = {
+ val a = accumulator.asInstanceOf[BigIntegralAvgAccumulator]
+ if (a.f1 == 0) {
+ null.asInstanceOf[T]
+ } else {
+ resultTypeConvert(a.f0.divide(BigInteger.valueOf(a.f1)))
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0).asInstanceOf[BigIntegralAvgAccumulator]
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[BigIntegralAvgAccumulator]
+ ret.f1 += a.f1
+ ret.f0 = ret.f0.add(a.f0)
+ i += 1
+ }
+ ret
+ }
+
+ /**
+ * Convert the intermediate result to the expected aggregation result type
+ *
+ * @param value the intermediate result. We use a BigInteger container to
+ * save the intermediate result to avoid the overflow by sum
+ * operation.
+ * @return the result value with the expected aggregation result type
+ */
+ def resultTypeConvert(value: BigInteger): T
+}
+
+/**
+ * Built-in Long Avg aggregate function
+ */
+class LongAvgAggFunction extends BigIntegralAvgAggFunction[Long] {
+ override def resultTypeConvert(value: BigInteger): Long = value.longValue()
+}
+
+/**
+ * Base class for built-in Floating Avg aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class FloatingAvgAggFunction[T] extends AggregateFunction[T] {
+
+ /** The initial accumulator for Floating Avg aggregate function */
+ class FloatingAvgAccumulator extends JTuple2[Double, Long] with Accumulator {
+ f0 = 0 //sum
+ f1 = 0 //count
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new FloatingAvgAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[Number].doubleValue()
+ val accum = accumulator.asInstanceOf[FloatingAvgAccumulator]
+ accum.f0 += v
+ accum.f1 += 1
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): T = {
+ val accum = accumulator.asInstanceOf[FloatingAvgAccumulator]
+ if (accum.f1 == 0) {
+ null.asInstanceOf[T]
+ } else {
+ resultTypeConvert(accum.f0 / accum.f1)
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0).asInstanceOf[FloatingAvgAccumulator]
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[FloatingAvgAccumulator]
+ ret.f1 += a.f1
+ ret.f0 += a.f0
+ i += 1
+ }
+ ret
+ }
+
+ /**
+ * Convert the intermediate result to the expected aggregation result type
+ *
+ * @param value the intermediate result. We use a Double container to save
+ * the intermediate result to avoid the overflow by sum operation.
+ * @return the result value with the expected aggregation result type
+ */
+ def resultTypeConvert(value: Double): T
+}
+
+/**
+ * Built-in Float Avg aggregate function
+ */
+class FloatAvgAggFunction extends FloatingAvgAggFunction[Float] {
+ override def resultTypeConvert(value: Double): Float = value.toFloat
+}
+
+/**
+ * Built-in Int Double aggregate function
+ */
+class DoubleAvgAggFunction extends FloatingAvgAggFunction[Double] {
+ override def resultTypeConvert(value: Double): Double = value
+}
+
+/**
+ * Base class for built-in Big Decimal Avg aggregate function
+ */
+class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] {
+
+ /** The initial accumulator for Big Decimal Avg aggregate function */
+ class DecimalAvgAccumulator
+ extends JTuple2[BigDecimal, Long] with Accumulator {
+ f0 = BigDecimal.ZERO //sum
+ f1 = 0 //count
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new DecimalAvgAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[BigDecimal]
+ val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
+ if (accum.f1 == 0) {
+ accum.f0 = v
+ } else {
+ accum.f0 = accum.f0.add(v)
+ }
+ accum.f1 += 1
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): BigDecimal = {
+ val a = accumulator.asInstanceOf[DecimalAvgAccumulator]
+ if (a.f1 == 0) {
+ null.asInstanceOf[BigDecimal]
+ } else {
+ a.f0.divide(BigDecimal.valueOf(a.f1))
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0).asInstanceOf[DecimalAvgAccumulator]
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[DecimalAvgAccumulator]
+ ret.f0 = ret.f0.add(a.f0)
+ ret.f1 += a.f1
+ i += 1
+ }
+ ret
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
new file mode 100644
index 0000000..8b903d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.util.{List => JList}
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+ * built-in count aggregate function
+ */
+class CountAggFunction extends AggregateFunction[Long] {
+
+ /** The initial accumulator for count aggregate function */
+ class CountAccumulator extends JTuple1[Long] with Accumulator {
+ f0 = 0 //count
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ accumulator.asInstanceOf[CountAccumulator].f0 += 1
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): Long = {
+ accumulator.asInstanceOf[CountAccumulator].f0
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0).asInstanceOf[CountAccumulator]
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ ret.f0 += accumulators.get(i).asInstanceOf[CountAccumulator].f0
+ i += 1
+ }
+ ret
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new CountAccumulator
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
new file mode 100644
index 0000000..20041ee
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{List => JList}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+ * Base class for built-in Max aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
+
+ /** The initial accumulator for Max aggregate function */
+ class MaxAccumulator extends JTuple2[T, Boolean] with Accumulator {
+ f0 = 0.asInstanceOf[T] //max
+ f1 = false
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new MaxAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[T]
+ val a = accumulator.asInstanceOf[MaxAccumulator]
+ if (!a.f1 || ord.compare(a.f0, v) < 0) {
+ a.f0 = v
+ a.f1 = true
+ }
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): T = {
+ val a = accumulator.asInstanceOf[MaxAccumulator]
+ if (a.f1) {
+ a.f0
+ } else {
+ null.asInstanceOf[T]
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0)
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[MaxAccumulator]
+ if (a.f1) {
+ accumulate(ret.asInstanceOf[MaxAccumulator], a.f0)
+ }
+ i += 1
+ }
+ ret
+ }
+}
+
+/**
+ * Built-in Byte Max aggregate function
+ */
+class ByteMaxAggFunction extends MaxAggFunction[Byte]
+
+/**
+ * Built-in Short Max aggregate function
+ */
+class ShortMaxAggFunction extends MaxAggFunction[Short]
+
+/**
+ * Built-in Int Max aggregate function
+ */
+class IntMaxAggFunction extends MaxAggFunction[Int]
+
+/**
+ * Built-in Long Max aggregate function
+ */
+class LongMaxAggFunction extends MaxAggFunction[Long]
+
+/**
+ * Built-in Float Max aggregate function
+ */
+class FloatMaxAggFunction extends MaxAggFunction[Float]
+
+/**
+ * Built-in Double Max aggregate function
+ */
+class DoubleMaxAggFunction extends MaxAggFunction[Double]
+
+/**
+ * Built-in Boolean Max aggregate function
+ */
+class BooleanMaxAggFunction extends MaxAggFunction[Boolean]
+
+/**
+ * Built-in Big Decimal Max aggregate function
+ */
+class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] {
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[BigDecimal]
+ val accum = accumulator.asInstanceOf[MaxAccumulator]
+ if (!accum.f1 || accum.f0.compareTo(v) < 0) {
+ accum.f0 = v
+ accum.f1 = true
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
new file mode 100644
index 0000000..16461ae
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{List => JList}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+ * Base class for built-in Min aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
+
+ /** The initial accumulator for Min aggregate function */
+ class MinAccumulator extends JTuple2[T, Boolean] with Accumulator {
+ f0 = 0.asInstanceOf[T] //min
+ f1 = false
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new MinAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[T]
+ val a = accumulator.asInstanceOf[MinAccumulator]
+ if (!a.f1 || ord.compare(a.f0, v) > 0) {
+ a.f0 = v
+ a.f1 = true
+ }
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): T = {
+ val a = accumulator.asInstanceOf[MinAccumulator]
+ if (a.f1) {
+ a.f0
+ } else {
+ null.asInstanceOf[T]
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0)
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[MinAccumulator]
+ if (a.f1) {
+ accumulate(ret.asInstanceOf[MinAccumulator], a.f0)
+ }
+ i += 1
+ }
+ ret
+ }
+}
+
+/**
+ * Built-in Byte Min aggregate function
+ */
+class ByteMinAggFunction extends MinAggFunction[Byte]
+
+/**
+ * Built-in Short Min aggregate function
+ */
+class ShortMinAggFunction extends MinAggFunction[Short]
+
+/**
+ * Built-in Int Min aggregate function
+ */
+class IntMinAggFunction extends MinAggFunction[Int]
+
+/**
+ * Built-in Long Min aggregate function
+ */
+class LongMinAggFunction extends MinAggFunction[Long]
+
+/**
+ * Built-in Float Min aggregate function
+ */
+class FloatMinAggFunction extends MinAggFunction[Float]
+
+/**
+ * Built-in Double Min aggregate function
+ */
+class DoubleMinAggFunction extends MinAggFunction[Double]
+
+/**
+ * Built-in Boolean Min aggregate function
+ */
+class BooleanMinAggFunction extends MinAggFunction[Boolean]
+
+/**
+ * Built-in Big Decimal Min aggregate function
+ */
+class DecimalMinAggFunction extends MinAggFunction[BigDecimal] {
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[BigDecimal]
+ val accum = accumulator.asInstanceOf[MinAccumulator]
+ if (!accum.f1 || accum.f0.compareTo(v) > 0) {
+ accum.f0 = v
+ accum.f1 = true
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
new file mode 100644
index 0000000..b04d8c0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{List => JList}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+ * Base class for built-in Sum aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] {
+
+ /** The initial accumulator for Sum aggregate function */
+ class SumAccumulator extends JTuple2[T, Boolean] with Accumulator {
+ f0 = numeric.zero //sum
+ f1 = false
+ }
+
+ private val numeric = implicitly[Numeric[T]]
+
+ override def createAccumulator(): Accumulator = {
+ new SumAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[T]
+ val a = accumulator.asInstanceOf[SumAccumulator]
+ a.f0 = numeric.plus(v, a.f0)
+ a.f1 = true
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): T = {
+ val a = accumulator.asInstanceOf[SumAccumulator]
+ if (a.f1) {
+ a.f0
+ } else {
+ null.asInstanceOf[T]
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = createAccumulator().asInstanceOf[SumAccumulator]
+ var i: Int = 0
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[SumAccumulator]
+ if (a.f1) {
+ ret.f0 = numeric.plus(ret.f0, a.f0)
+ ret.f1 = true
+ }
+ i += 1
+ }
+ ret
+ }
+}
+
+/**
+ * Built-in Byte Sum aggregate function
+ */
+class ByteSumAggFunction extends SumAggFunction[Byte]
+
+/**
+ * Built-in Short Sum aggregate function
+ */
+class ShortSumAggFunction extends SumAggFunction[Short]
+
+/**
+ * Built-in Int Sum aggregate function
+ */
+class IntSumAggFunction extends SumAggFunction[Int]
+
+/**
+ * Built-in Long Sum aggregate function
+ */
+class LongSumAggFunction extends SumAggFunction[Long]
+
+/**
+ * Built-in Float Sum aggregate function
+ */
+class FloatSumAggFunction extends SumAggFunction[Float]
+
+/**
+ * Built-in Double Sum aggregate function
+ */
+class DoubleSumAggFunction extends SumAggFunction[Double]
+
+
+/**
+ * Built-in Big Decimal Sum aggregate function
+ */
+class DecimalSumAggFunction extends AggregateFunction[BigDecimal] {
+
+ /** The initial accumulator for Big Decimal Sum aggregate function */
+ class DecimalSumAccumulator extends JTuple2[BigDecimal, Boolean] with Accumulator {
+ f0 = BigDecimal.ZERO
+ f1 = false
+ }
+
+ override def createAccumulator(): Accumulator = {
+ new DecimalSumAccumulator
+ }
+
+ override def accumulate(accumulator: Accumulator, value: Any): Unit = {
+ if (value != null) {
+ val v = value.asInstanceOf[BigDecimal]
+ val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
+ accum.f0 = accum.f0.add(v)
+ accum.f1 = true
+ }
+ }
+
+ override def getValue(accumulator: Accumulator): BigDecimal = {
+ if (!accumulator.asInstanceOf[DecimalSumAccumulator].f1) {
+ null.asInstanceOf[BigDecimal]
+ } else {
+ accumulator.asInstanceOf[DecimalSumAccumulator].f0
+ }
+ }
+
+ override def merge(accumulators: JList[Accumulator]): Accumulator = {
+ val ret = accumulators.get(0).asInstanceOf[DecimalSumAccumulator]
+ var i: Int = 1
+ while (i < accumulators.size()) {
+ val a = accumulators.get(i).asInstanceOf[DecimalSumAccumulator]
+ if (a.f1) {
+ accumulate(ret, a.f0)
+ ret.f1 = true
+ }
+ i += 1
+ }
+ ret
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 16a6717b..aec4fbb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -119,6 +119,19 @@ object UserDefinedFunctionUtils {
}
/**
+ * Check if a given method exits in the given function
+ */
+ def ifMethodExitInFunction(method: String, function: UserDefinedFunction): Boolean = {
+ val methods = function
+ .getClass
+ .getMethods
+ .filter {
+ m => m.getName == method
+ }
+ !methods.isEmpty
+ }
+
+ /**
* Extracts "eval" methods and throws a [[ValidationException]] if no implementation
* can be found.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
new file mode 100644
index 0000000..627b25b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import java.util.{ArrayList => JArrayList, List => JList}
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+ * Base class for aggregate function test
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class AggFunctionTestBase[T] {
+ def inputValueSets: Seq[Seq[_]]
+
+ def expectedResults: Seq[T]
+
+ def aggregator: AggregateFunction[T]
+
+ @Test
+ // test aggregate functions without partial merge
+ def testAggregateWithoutMerge(): Unit = {
+ // iterate over input sets
+ for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
+ val accumulator = aggregateVals(vals)
+ val result = aggregator.getValue(accumulator)
+ validateResult(expected, result)
+ }
+ }
+
+ @Test
+ // test aggregate functions with partial merge
+ def testAggregateWithMerge(): Unit = {
+
+ if (ifMethodExitInFunction("merge", aggregator)) {
+ // iterate over input sets
+ for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
+ //equally split the vals sequence into two sequences
+ val (firstVals, secondVals) = vals.splitAt(vals.length / 2)
+
+ val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
+ accumulators.add(aggregateVals(firstVals))
+ accumulators.add(aggregateVals(secondVals))
+
+ val accumulator = aggregator.merge(accumulators)
+ val result = aggregator.getValue(accumulator)
+ validateResult(expected, result)
+ }
+
+ // iterate over input sets
+ for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
+ //test partial merge with an empty accumulator
+ val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
+ accumulators.add(aggregateVals(vals))
+ accumulators.add(aggregator.createAccumulator())
+
+ val accumulator = aggregator.merge(accumulators)
+ val result = aggregator.getValue(accumulator)
+ validateResult(expected, result)
+ }
+ }
+ }
+
+ private def validateResult(expected: T, result: T): Unit = {
+ (expected, result) match {
+ case (e: BigDecimal, r: BigDecimal) =>
+ // BigDecimal.equals() value and scale but we are only interested in value.
+ assert(e.compareTo(r) == 0)
+ case _ =>
+ assertEquals(expected, result)
+ }
+ }
+
+ private def aggregateVals(vals: Seq[_]): Accumulator = {
+ val accumulator = aggregator.createAccumulator()
+ vals.foreach(v => aggregator.accumulate(accumulator, v))
+ accumulator
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
new file mode 100644
index 0000000..a388acf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+ * Test case for built-in average aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class AvgAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] {
+
+ private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+ def minVal: T
+
+ def maxVal: T
+
+ override def inputValueSets: Seq[Seq[T]] = Seq(
+ Seq(
+ minVal,
+ minVal,
+ null.asInstanceOf[T],
+ minVal,
+ minVal,
+ null.asInstanceOf[T],
+ minVal,
+ minVal,
+ minVal
+ ),
+ Seq(
+ maxVal,
+ maxVal,
+ null.asInstanceOf[T],
+ maxVal,
+ maxVal,
+ null.asInstanceOf[T],
+ maxVal,
+ maxVal,
+ maxVal
+ ),
+ Seq(
+ minVal,
+ maxVal,
+ null.asInstanceOf[T],
+ numeric.fromInt(0),
+ numeric.negate(maxVal),
+ numeric.negate(minVal),
+ null.asInstanceOf[T]
+ ),
+ Seq(
+ numeric.fromInt(1),
+ numeric.fromInt(2),
+ null.asInstanceOf[T],
+ numeric.fromInt(3),
+ numeric.fromInt(4),
+ numeric.fromInt(5),
+ null.asInstanceOf[T]
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
+ )
+ )
+
+ override def expectedResults: Seq[T] = Seq(
+ minVal,
+ maxVal,
+ numeric.fromInt(0),
+ numeric.fromInt(3),
+ null.asInstanceOf[T]
+ )
+}
+
+class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte] {
+
+ override def minVal = (Byte.MinValue + 1).toByte
+
+ override def maxVal = (Byte.MaxValue - 1).toByte
+
+ override def aggregator = new ByteAvgAggFunction()
+}
+
+class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short] {
+
+ override def minVal = (Short.MinValue + 1).toShort
+
+ override def maxVal = (Short.MaxValue - 1).toShort
+
+ override def aggregator = new ShortAvgAggFunction()
+}
+
+class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int] {
+
+ override def minVal = Int.MinValue + 1
+
+ override def maxVal = Int.MaxValue - 1
+
+ override def aggregator = new IntAvgAggFunction()
+}
+
+class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long] {
+
+ override def minVal = Long.MinValue + 1
+
+ override def maxVal = Long.MaxValue - 1
+
+ override def aggregator = new LongAvgAggFunction()
+}
+
+class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float] {
+
+ override def minVal = Float.MinValue
+
+ override def maxVal = Float.MaxValue
+
+ override def aggregator = new FloatAvgAggFunction()
+}
+
+class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double] {
+
+ override def minVal = Float.MinValue
+
+ override def maxVal = Float.MaxValue
+
+ override def aggregator = new DoubleAvgAggFunction()
+}
+
+class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new BigDecimal("987654321000000"),
+ new BigDecimal("-0.000000000012345"),
+ null,
+ new BigDecimal("0.000000000012345"),
+ new BigDecimal("-987654321000000"),
+ null,
+ new BigDecimal("0")
+ ),
+ Seq(
+ new BigDecimal("987654321000000"),
+ new BigDecimal("-0.000000000012345"),
+ null,
+ new BigDecimal("0.000000000012345"),
+ new BigDecimal("-987654321000000"),
+ null,
+ new BigDecimal("5")
+ ),
+ Seq(
+ null,
+ null,
+ null,
+ null
+ )
+ )
+
+ override def expectedResults: Seq[BigDecimal] = Seq(
+ BigDecimal.ZERO,
+ BigDecimal.ONE,
+ null
+ )
+
+ override def aggregator: AggregateFunction[BigDecimal] = new DecimalAvgAggFunction()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
new file mode 100644
index 0000000..d5f09b2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunctionTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+ * Test case for built-in count aggregate function
+ */
+class CountAggFunctionTest extends AggFunctionTestBase[Long] {
+
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq("a", "b", null, "c", null, "d", "e", null, "f"),
+ Seq(null, null, null, null, null, null)
+ )
+
+ override def expectedResults: Seq[Long] = Seq(6L, 0L)
+
+ override def aggregator: AggregateFunction[Long] = new CountAggFunction()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
new file mode 100644
index 0000000..91cbeea
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionTest.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+ * Test case for built-in max aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class MaxAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
+
+ private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+ def minVal: T
+
+ def maxVal: T
+
+ override def inputValueSets: Seq[Seq[T]] = Seq(
+ Seq(
+ numeric.fromInt(1),
+ null.asInstanceOf[T],
+ maxVal,
+ numeric.fromInt(-99),
+ numeric.fromInt(3),
+ numeric.fromInt(56),
+ numeric.fromInt(0),
+ minVal,
+ numeric.fromInt(-20),
+ numeric.fromInt(17),
+ null.asInstanceOf[T]
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
+ )
+ )
+
+ override def expectedResults: Seq[T] = Seq(
+ maxVal,
+ null.asInstanceOf[T]
+ )
+}
+
+class ByteMaxAggFunctionTest extends MaxAggFunctionTest[Byte] {
+
+ override def minVal = (Byte.MinValue + 1).toByte
+
+ override def maxVal = (Byte.MaxValue - 1).toByte
+
+ override def aggregator: AggregateFunction[Byte] = new ByteMaxAggFunction()
+}
+
+class ShortMaxAggFunctionTest extends MaxAggFunctionTest[Short] {
+
+ override def minVal = (Short.MinValue + 1).toShort
+
+ override def maxVal = (Short.MaxValue - 1).toShort
+
+ override def aggregator: AggregateFunction[Short] = new ShortMaxAggFunction()
+}
+
+class IntMaxAggFunctionTest extends MaxAggFunctionTest[Int] {
+
+ override def minVal = Int.MinValue + 1
+
+ override def maxVal = Int.MaxValue - 1
+
+ override def aggregator: AggregateFunction[Int] = new IntMaxAggFunction()
+}
+
+class LongMaxAggFunctionTest extends MaxAggFunctionTest[Long] {
+
+ override def minVal = Long.MinValue + 1
+
+ override def maxVal = Long.MaxValue - 1
+
+ override def aggregator: AggregateFunction[Long] = new LongMaxAggFunction()
+}
+
+class FloatMaxAggFunctionTest extends MaxAggFunctionTest[Float] {
+
+ override def minVal = Float.MinValue / 2
+
+ override def maxVal = Float.MaxValue / 2
+
+ override def aggregator: AggregateFunction[Float] = new FloatMaxAggFunction()
+}
+
+class DoubleMaxAggFunctionTest extends MaxAggFunctionTest[Double] {
+
+ override def minVal = Double.MinValue / 2
+
+ override def maxVal = Double.MaxValue / 2
+
+ override def aggregator: AggregateFunction[Double] = new DoubleMaxAggFunction()
+}
+
+class BooleanMaxAggFunctionTest extends AggFunctionTestBase[Boolean] {
+
+ override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+ Seq(
+ false,
+ false,
+ false
+ ),
+ Seq(
+ true,
+ true,
+ true
+ ),
+ Seq(
+ true,
+ false,
+ null.asInstanceOf[Boolean],
+ true,
+ false,
+ true,
+ null.asInstanceOf[Boolean]
+ ),
+ Seq(
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean]
+ )
+ )
+
+ override def expectedResults: Seq[Boolean] = Seq(
+ false,
+ true,
+ true,
+ null.asInstanceOf[Boolean]
+ )
+
+ override def aggregator: AggregateFunction[Boolean] = new BooleanMaxAggFunction()
+}
+
+class DecimalMaxAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new BigDecimal("1"),
+ new BigDecimal("1000.000001"),
+ new BigDecimal("-1"),
+ new BigDecimal("-999.998999"),
+ null,
+ new BigDecimal("0"),
+ new BigDecimal("-999.999"),
+ null,
+ new BigDecimal("999.999")
+ ),
+ Seq(
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ )
+
+ override def expectedResults: Seq[BigDecimal] = Seq(
+ new BigDecimal("1000.000001"),
+ null
+ )
+
+ override def aggregator: AggregateFunction[BigDecimal] = new DecimalMaxAggFunction()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
new file mode 100644
index 0000000..6a6e5b9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionTest.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+ * Test case for built-in max aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class MinAggFunctionTest[T: Numeric] extends AggFunctionTestBase[T] {
+
+ private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+ def minVal: T
+
+ def maxVal: T
+
+ override def inputValueSets: Seq[Seq[T]] = Seq(
+ Seq(
+ numeric.fromInt(1),
+ null.asInstanceOf[T],
+ maxVal,
+ numeric.fromInt(-99),
+ numeric.fromInt(3),
+ numeric.fromInt(56),
+ numeric.fromInt(0),
+ minVal,
+ numeric.fromInt(-20),
+ numeric.fromInt(17),
+ null.asInstanceOf[T]
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
+ )
+ )
+
+ override def expectedResults: Seq[T] = Seq(
+ minVal,
+ null.asInstanceOf[T]
+ )
+}
+
+class ByteMinAggFunctionTest extends MinAggFunctionTest[Byte] {
+
+ override def minVal = (Byte.MinValue + 1).toByte
+
+ override def maxVal = (Byte.MaxValue - 1).toByte
+
+ override def aggregator: AggregateFunction[Byte] = new ByteMinAggFunction()
+}
+
+class ShortMinAggFunctionTest extends MinAggFunctionTest[Short] {
+
+ override def minVal = (Short.MinValue + 1).toShort
+
+ override def maxVal = (Short.MaxValue - 1).toShort
+
+ override def aggregator: AggregateFunction[Short] = new ShortMinAggFunction()
+}
+
+class IntMinAggFunctionTest extends MinAggFunctionTest[Int] {
+
+ override def minVal = Int.MinValue + 1
+
+ override def maxVal = Int.MaxValue - 1
+
+ override def aggregator: AggregateFunction[Int] = new IntMinAggFunction()
+}
+
+class LongMinAggFunctionTest extends MinAggFunctionTest[Long] {
+
+ override def minVal = Long.MinValue + 1
+
+ override def maxVal = Long.MaxValue - 1
+
+ override def aggregator: AggregateFunction[Long] = new LongMinAggFunction()
+}
+
+class FloatMinAggFunctionTest extends MinAggFunctionTest[Float] {
+
+ override def minVal = Float.MinValue / 2
+
+ override def maxVal = Float.MaxValue / 2
+
+ override def aggregator: AggregateFunction[Float] = new FloatMinAggFunction()
+}
+
+class DoubleMinAggFunctionTest extends MinAggFunctionTest[Double] {
+
+ override def minVal = Double.MinValue / 2
+
+ override def maxVal = Double.MaxValue / 2
+
+ override def aggregator: AggregateFunction[Double] = new DoubleMinAggFunction()
+}
+
+class BooleanMinAggFunctionTest extends AggFunctionTestBase[Boolean] {
+
+ override def inputValueSets: Seq[Seq[Boolean]] = Seq(
+ Seq(
+ false,
+ false,
+ false
+ ),
+ Seq(
+ true,
+ true,
+ true
+ ),
+ Seq(
+ true,
+ false,
+ null.asInstanceOf[Boolean],
+ true,
+ false,
+ true,
+ null.asInstanceOf[Boolean]
+ ),
+ Seq(
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean]
+ )
+ )
+
+ override def expectedResults: Seq[Boolean] = Seq(
+ false,
+ true,
+ false,
+ null.asInstanceOf[Boolean]
+ )
+
+ override def aggregator: AggregateFunction[Boolean] = new BooleanMinAggFunction()
+}
+
+class DecimalMinAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new BigDecimal("1"),
+ new BigDecimal("1000"),
+ new BigDecimal("-1"),
+ new BigDecimal("-999.998999"),
+ null,
+ new BigDecimal("0"),
+ new BigDecimal("-999.999"),
+ null,
+ new BigDecimal("999.999")
+ ),
+ Seq(
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ )
+
+ override def expectedResults: Seq[BigDecimal] = Seq(
+ new BigDecimal("-999.999"),
+ null
+ )
+
+ override def aggregator: AggregateFunction[BigDecimal] = new DecimalMinAggFunction()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd8ef550/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
new file mode 100644
index 0000000..95feddd
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunctionTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+import org.apache.flink.table.functions.AggregateFunction
+
+/**
+ * Test case for built-in sum aggregate function
+ *
+ * @tparam T the type for the aggregation result
+ */
+abstract class SumAggFunctionTestBase[T: Numeric] extends AggFunctionTestBase[T] {
+
+ private val numeric: Numeric[T] = implicitly[Numeric[T]]
+
+ def maxVal: T
+
+ private val minVal = numeric.negate(maxVal)
+
+ override def inputValueSets: Seq[Seq[T]] = Seq(
+ Seq(
+ minVal,
+ numeric.fromInt(1),
+ null.asInstanceOf[T],
+ numeric.fromInt(2),
+ numeric.fromInt(3),
+ numeric.fromInt(4),
+ numeric.fromInt(5),
+ numeric.fromInt(-10),
+ numeric.fromInt(-20),
+ numeric.fromInt(17),
+ null.asInstanceOf[T],
+ maxVal
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
+ )
+ )
+
+ override def expectedResults: Seq[T] = Seq(
+ numeric.fromInt(2),
+ null.asInstanceOf[T]
+ )
+}
+
+class ByteSumAggFunctionTest extends SumAggFunctionTestBase[Byte] {
+
+ override def maxVal = (Byte.MaxValue / 2).toByte
+
+ override def aggregator: AggregateFunction[Byte] = new ByteSumAggFunction
+}
+
+class ShortSumAggFunctionTest extends SumAggFunctionTestBase[Short] {
+
+ override def maxVal = (Short.MaxValue / 2).toShort
+
+ override def aggregator: AggregateFunction[Short] = new ShortSumAggFunction
+}
+
+class IntSumAggFunctionTest extends SumAggFunctionTestBase[Int] {
+
+ override def maxVal = Int.MaxValue / 2
+
+ override def aggregator: AggregateFunction[Int] = new IntSumAggFunction
+}
+
+class LongSumAggFunctionTest extends SumAggFunctionTestBase[Long] {
+
+ override def maxVal = Long.MaxValue / 2
+
+ override def aggregator: AggregateFunction[Long] = new LongSumAggFunction
+}
+
+class FloatSumAggFunctionTest extends SumAggFunctionTestBase[Float] {
+
+ override def maxVal = 12345.6789f
+
+ override def aggregator: AggregateFunction[Float] = new FloatSumAggFunction
+}
+
+class DoubleSumAggFunctionTest extends SumAggFunctionTestBase[Double] {
+
+ override def maxVal = 12345.6789d
+
+ override def aggregator: AggregateFunction[Double] = new DoubleSumAggFunction
+}
+
+
+class DecimalSumAggFunctionTest extends AggFunctionTestBase[BigDecimal] {
+
+ override def inputValueSets: Seq[Seq[_]] = Seq(
+ Seq(
+ new BigDecimal("1"),
+ new BigDecimal("2"),
+ new BigDecimal("3"),
+ null,
+ new BigDecimal("0"),
+ new BigDecimal("-1000"),
+ new BigDecimal("0.000000000002"),
+ new BigDecimal("1000"),
+ new BigDecimal("-0.000000000001"),
+ new BigDecimal("999.999"),
+ null,
+ new BigDecimal("4"),
+ new BigDecimal("-999.999"),
+ null
+ ),
+ Seq(
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ )
+
+ override def expectedResults: Seq[BigDecimal] = Seq(
+ new BigDecimal("10.000000000001"),
+ null
+ )
+
+ override def aggregator: AggregateFunction[BigDecimal] = new DecimalSumAggFunction()
+}
+
+