You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/03 13:27:35 UTC

[3/3] flink git commit: [hotfix] [table] Fix initialization of accumulators for MIN and MAX aggregates.

[hotfix] [table] Fix initialization of accumulators for MIN and MAX aggregates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1721bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1721bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1721bb

Branch: refs/heads/master
Commit: 2d1721bb9b17333c3c06e3675a24d344aed3c87f
Parents: 050f9a4
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Mar 2 22:57:47 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 3 14:27:08 2017 +0100

----------------------------------------------------------------------
 .../functions/aggfunctions/MaxAggFunction.scala | 21 +++++++++++++++-----
 .../functions/aggfunctions/MinAggFunction.scala | 21 +++++++++++++++-----
 2 files changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d1721bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
index 62ff88c..33cfd65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala
@@ -26,10 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
 
 /** The initial accumulator for Max aggregate function */
-class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
-  f0 = 0.asInstanceOf[T] //max
-  f1 = false
-}
+class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
 
 /**
   * Base class for built-in Max aggregate function
@@ -39,7 +36,10 @@ class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
 abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
 
   override def createAccumulator(): Accumulator = {
-    new MaxAccumulator[T]
+    val acc = new MaxAccumulator[T]
+    acc.f0 = getInitValue
+    acc.f1 = false
+    acc
   }
 
   override def accumulate(accumulator: Accumulator, value: Any): Unit = {
@@ -82,6 +82,8 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
       BasicTypeInfo.BOOLEAN_TYPE_INFO)
   }
 
+  def getInitValue: T
+
   def getValueTypeInfo: TypeInformation[_]
 }
 
@@ -89,6 +91,7 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
   * Built-in Byte Max aggregate function
   */
 class ByteMaxAggFunction extends MaxAggFunction[Byte] {
+  override def getInitValue: Byte = 0.toByte
   override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
 }
 
@@ -96,6 +99,7 @@ class ByteMaxAggFunction extends MaxAggFunction[Byte] {
   * Built-in Short Max aggregate function
   */
 class ShortMaxAggFunction extends MaxAggFunction[Short] {
+  override def getInitValue: Short = 0.toShort
   override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
 }
 
@@ -103,6 +107,7 @@ class ShortMaxAggFunction extends MaxAggFunction[Short] {
   * Built-in Int Max aggregate function
   */
 class IntMaxAggFunction extends MaxAggFunction[Int] {
+  override def getInitValue: Int = 0
   override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
 }
 
@@ -110,6 +115,7 @@ class IntMaxAggFunction extends MaxAggFunction[Int] {
   * Built-in Long Max aggregate function
   */
 class LongMaxAggFunction extends MaxAggFunction[Long] {
+  override def getInitValue: Long = 0L
   override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
 }
 
@@ -117,6 +123,7 @@ class LongMaxAggFunction extends MaxAggFunction[Long] {
   * Built-in Float Max aggregate function
   */
 class FloatMaxAggFunction extends MaxAggFunction[Float] {
+  override def getInitValue: Float = 0.0f
   override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
 }
 
@@ -124,6 +131,7 @@ class FloatMaxAggFunction extends MaxAggFunction[Float] {
   * Built-in Double Max aggregate function
   */
 class DoubleMaxAggFunction extends MaxAggFunction[Double] {
+  override def getInitValue: Double = 0.0d
   override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
 }
 
@@ -131,6 +139,7 @@ class DoubleMaxAggFunction extends MaxAggFunction[Double] {
   * Built-in Boolean Max aggregate function
   */
 class BooleanMaxAggFunction extends MaxAggFunction[Boolean] {
+  override def getInitValue = false
   override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO
 }
 
@@ -150,5 +159,7 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] {
     }
   }
 
+  override def getInitValue = BigDecimal.ZERO
+
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1721bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
index cddb873..1b2d6b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala
@@ -26,10 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
 
 /** The initial accumulator for Min aggregate function */
-class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
-  f0 = 0.asInstanceOf[T] //min
-  f1 = false
-}
+class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
 
 /**
   * Base class for built-in Min aggregate function
@@ -39,7 +36,10 @@ class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator {
 abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
 
   override def createAccumulator(): Accumulator = {
-    new MinAccumulator[T]
+    val acc = new MinAccumulator[T]
+    acc.f0 = getInitValue
+    acc.f1 = false
+    acc
   }
 
   override def accumulate(accumulator: Accumulator, value: Any): Unit = {
@@ -82,6 +82,8 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
       BasicTypeInfo.BOOLEAN_TYPE_INFO)
   }
 
+  def getInitValue: T
+
   def getValueTypeInfo: TypeInformation[_]
 }
 
@@ -89,6 +91,7 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun
   * Built-in Byte Min aggregate function
   */
 class ByteMinAggFunction extends MinAggFunction[Byte] {
+  override def getInitValue: Byte = 0.toByte
   override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
 }
 
@@ -96,6 +99,7 @@ class ByteMinAggFunction extends MinAggFunction[Byte] {
   * Built-in Short Min aggregate function
   */
 class ShortMinAggFunction extends MinAggFunction[Short] {
+  override def getInitValue: Short = 0.toShort
   override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
 }
 
@@ -103,6 +107,7 @@ class ShortMinAggFunction extends MinAggFunction[Short] {
   * Built-in Int Min aggregate function
   */
 class IntMinAggFunction extends MinAggFunction[Int] {
+  override def getInitValue: Int = 0
   override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
 }
 
@@ -110,6 +115,7 @@ class IntMinAggFunction extends MinAggFunction[Int] {
   * Built-in Long Min aggregate function
   */
 class LongMinAggFunction extends MinAggFunction[Long] {
+  override def getInitValue: Long = 0L
   override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
 }
 
@@ -117,6 +123,7 @@ class LongMinAggFunction extends MinAggFunction[Long] {
   * Built-in Float Min aggregate function
   */
 class FloatMinAggFunction extends MinAggFunction[Float] {
+  override def getInitValue: Float = 0.0f
   override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
 }
 
@@ -124,6 +131,7 @@ class FloatMinAggFunction extends MinAggFunction[Float] {
   * Built-in Double Min aggregate function
   */
 class DoubleMinAggFunction extends MinAggFunction[Double] {
+  override def getInitValue: Double = 0.0d
   override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
 }
 
@@ -131,6 +139,7 @@ class DoubleMinAggFunction extends MinAggFunction[Double] {
   * Built-in Boolean Min aggregate function
   */
 class BooleanMinAggFunction extends MinAggFunction[Boolean] {
+  override def getInitValue: Boolean = false
   override def getValueTypeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO
 }
 
@@ -150,5 +159,7 @@ class DecimalMinAggFunction extends MinAggFunction[BigDecimal] {
     }
   }
 
+  override def getInitValue: BigDecimal = BigDecimal.ZERO
+
   override def getValueTypeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO
 }