You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/03 13:27:35 UTC
[3/3] flink git commit: [hotfix] [table] Fix initialization of
accumulators for MIN and MAX aggregates.
[hotfix] [table] Fix initialization of accumulators for MIN and MAX aggregates.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1721bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1721bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1721bb
Branch: refs/heads/master
Commit: 2d1721bb9b17333c3c06e3675a24d344aed3c87f
Parents: 050f9a4
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Mar 2 22:57:47 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 3 14:27:08 2017 +0100
----------------------------------------------------------------------
.../functions/aggfunctions/MaxAggFunction.scala | 21 +++++++++++++++-----
.../functions/aggfunctions/MinAggFunction.scala | 21 +++++++++++++++-----
2 files changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1721bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 62ff88c..33cfd65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -26,10 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
/** The initial accumulator for Max aggregate function */
-class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
- f0 = 0.asInstanceOf[T] //max
- f1 = false
-}
+class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
/**
* Base class for built-in Max aggregate function
@@ -39,7 +36,10 @@ class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
override def createAccumulator(): Accumulator = {
- new MaxAccumulator[T]
+ val acc = new MaxAccumulator[T]
+ acc.f0 = getInitValue
+ acc.f1 = false
+ acc
}
override def accumulate(accumulator: Accumulator, value: Any): Unit = {
@@ -82,6 +82,8 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
BasicTypeInfo.BOOLEAN_TYPE_INFO)
}
+ def getInitValue: T
+
def getValueTypeInfo: TypeInformation[_]
}
@@ -89,6 +91,7 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
* Built-in Byte Max aggregate function
*/
class ByteMaxAggFunction extends MaxAggFunction[Byte] {
+ override def getInitValue: Byte = 0.toByte
override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
}
@@ -96,6 +99,7 @@ class ByteMaxAggFunction extends MaxAggFunction[Byte] {
* Built-in Short Max aggregate function
*/
class ShortMaxAggFunction extends MaxAggFunction[Short] {
+ override def getInitValue: Short = 0.toShort
override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
}
@@ -103,6 +107,7 @@ class ShortMaxAggFunction extends MaxAggFunction[Short] {
* Built-in Int Max aggregate function
*/
class IntMaxAggFunction extends MaxAggFunction[Int] {
+ override def getInitValue: Int = 0
override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
}
@@ -110,6 +115,7 @@ class IntMaxAggFunction extends MaxAggFunction[Int] {
* Built-in Long Max aggregate function
*/
class LongMaxAggFunction extends MaxAggFunction[Long] {
+ override def getInitValue: Long = 0L
override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
}
@@ -117,6 +123,7 @@ class LongMaxAggFunction extends MaxAggFunction[Long] {
* Built-in Float Max aggregate function
*/
class FloatMaxAggFunction extends MaxAggFunction[Float] {
+ override def getInitValue: Float = 0.0f
override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
}
@@ -124,6 +131,7 @@ class FloatMaxAggFunction extends MaxAggFunction[Float] {
* Built-in Double Max aggregate function
*/
class DoubleMaxAggFunction extends MaxAggFunction[Double] {
+ override def getInitValue: Double = 0.0d
override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
}
@@ -131,6 +139,7 @@ class DoubleMaxAggFunction extends MaxAggFunction[Double] {
* Built-in Boolean Max aggregate function
*/
class BooleanMaxAggFunction extends MaxAggFunction[Boolean] {
+ override def getInitValue = false
override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO
}
@@ -150,5 +159,7 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] {
}
}
+ override def getInitValue = BigDecimal.ZERO
+
override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1721bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index cddb873..1b2d6b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -26,10 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
/** The initial accumulator for Min aggregate function */
-class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
- f0 = 0.asInstanceOf[T] //min
- f1 = false
-}
+class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
/**
* Base class for built-in Min aggregate function
@@ -39,7 +36,10 @@ class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
override def createAccumulator(): Accumulator = {
- new MinAccumulator[T]
+ val acc = new MinAccumulator[T]
+ acc.f0 = getInitValue
+ acc.f1 = false
+ acc
}
override def accumulate(accumulator: Accumulator, value: Any): Unit = {
@@ -82,6 +82,8 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
BasicTypeInfo.BOOLEAN_TYPE_INFO)
}
+ def getInitValue: T
+
def getValueTypeInfo: TypeInformation[_]
}
@@ -89,6 +91,7 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
* Built-in Byte Min aggregate function
*/
class ByteMinAggFunction extends MinAggFunction[Byte] {
+ override def getInitValue: Byte = 0.toByte
override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
}
@@ -96,6 +99,7 @@ class ByteMinAggFunction extends MinAggFunction[Byte] {
* Built-in Short Min aggregate function
*/
class ShortMinAggFunction extends MinAggFunction[Short] {
+ override def getInitValue: Short = 0.toShort
override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
}
@@ -103,6 +107,7 @@ class ShortMinAggFunction extends MinAggFunction[Short] {
* Built-in Int Min aggregate function
*/
class IntMinAggFunction extends MinAggFunction[Int] {
+ override def getInitValue: Int = 0
override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
}
@@ -110,6 +115,7 @@ class IntMinAggFunction extends MinAggFunction[Int] {
* Built-in Long Min aggregate function
*/
class LongMinAggFunction extends MinAggFunction[Long] {
+ override def getInitValue: Long = 0L
override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
}
@@ -117,6 +123,7 @@ class LongMinAggFunction extends MinAggFunction[Long] {
* Built-in Float Min aggregate function
*/
class FloatMinAggFunction extends MinAggFunction[Float] {
+ override def getInitValue: Float = 0.0f
override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
}
@@ -124,6 +131,7 @@ class FloatMinAggFunction extends MinAggFunction[Float] {
* Built-in Double Min aggregate function
*/
class DoubleMinAggFunction extends MinAggFunction[Double] {
+ override def getInitValue: Double = 0.0d
override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
}
@@ -131,6 +139,7 @@ class DoubleMinAggFunction extends MinAggFunction[Double] {
* Built-in Boolean Min aggregate function
*/
class BooleanMinAggFunction extends MinAggFunction[Boolean] {
+ override def getInitValue: Boolean = false
override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO
}
@@ -150,5 +159,7 @@ class DecimalMinAggFunction extends MinAggFunction[BigDecimal] {
}
}
+ override def getInitValue: BigDecimal = BigDecimal.ZERO
+
override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
}