You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/06/14 15:58:46 UTC
[2/2] flink git commit: [FLINK-3971] [tableAPI] Fix handling of null
values in aggregations.
[FLINK-3971] [tableAPI] Fix handling of null values in aggregations.
This closes #2049
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdf43609
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdf43609
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdf43609
Branch: refs/heads/master
Commit: fdf43609977c807deb3bb81bf1095efb721fe688
Parents: e0b9e8d
Author: gallenvara <ga...@126.com>
Authored: Mon May 30 18:30:07 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jun 14 15:05:32 2016 +0200
----------------------------------------------------------------------
.../table/runtime/aggregate/AvgAggregate.scala | 60 ++++++++++++++++----
.../table/runtime/aggregate/MaxAggregate.scala | 42 ++++++--------
.../table/runtime/aggregate/MinAggregate.scala | 44 +++++++-------
.../table/runtime/aggregate/SumAggregate.scala | 12 +++-
.../runtime/aggregate/AvgAggregateTest.scala | 11 +++-
.../runtime/aggregate/CountAggregateTest.scala | 5 +-
.../runtime/aggregate/MaxAggregateTest.scala | 25 +++++++-
.../runtime/aggregate/MinAggregateTest.scala | 25 +++++++-
.../runtime/aggregate/SumAggregateTest.scala | 14 ++++-
9 files changed, 165 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
index 8cf181a..e724648 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
@@ -59,11 +59,17 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
}
+ override def evaluate(buffer : Row): T = {
+ doEvaluate(buffer).asInstanceOf[T]
+ }
+
override def intermediateDataType = Array(
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO)
def doPrepare(value: Any, partial: Row): Unit
+
+ def doEvaluate(buffer: Row): Any
}
class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
@@ -73,10 +79,14 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
partial.setField(partialCountIndex, 1L)
}
- override def evaluate(buffer: Row): Byte = {
+ override def doEvaluate(buffer: Row): Any = {
val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
- (bufferSum / bufferCount).toByte
+ if (bufferCount == 0L) {
+ null
+ } else {
+ (bufferSum / bufferCount).toByte
+ }
}
}
@@ -88,10 +98,14 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
partial.setField(partialCountIndex, 1L)
}
- override def evaluate(buffer: Row): Short = {
+ override def doEvaluate(buffer: Row): Any = {
val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
- (bufferSum / bufferCount).toShort
+ if (bufferCount == 0L) {
+ null
+ } else {
+ (bufferSum / bufferCount).toShort
+ }
}
}
@@ -103,10 +117,14 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] {
partial.setField(partialCountIndex, 1L)
}
- override def evaluate(buffer: Row): Int = {
+ override def doEvaluate(buffer: Row): Any = {
val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
- (bufferSum / bufferCount).toInt
+ if (bufferCount == 0L) {
+ null
+ } else {
+ (bufferSum / bufferCount).toInt
+ }
}
}
@@ -145,10 +163,14 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] {
buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
}
- override def evaluate(buffer: Row): Long = {
+ override def doEvaluate(buffer: Row): Any = {
val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
- bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
+ if (bufferCount == 0L) {
+ null
+ } else {
+ bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
+ }
}
}
@@ -178,11 +200,17 @@ abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
buffer.setField(partialCountIndex, partialCount + bufferCount)
}
+ override def evaluate(buffer : Row): T = {
+ doEvaluate(buffer).asInstanceOf[T]
+ }
+
override def intermediateDataType = Array(
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO)
def doPrepare(value: Any, partial: Row): Unit
+
+ def doEvaluate(buffer: Row): Any
}
class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
@@ -194,10 +222,14 @@ class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
}
- override def evaluate(buffer: Row): Float = {
+ override def doEvaluate(buffer: Row): Any = {
val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
- (bufferSum / bufferCount).toFloat
+ if (bufferCount == 0L) {
+ null
+ } else {
+ (bufferSum / bufferCount).toFloat
+ }
}
}
@@ -209,9 +241,13 @@ class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
partial.setField(partialCountIndex, 1L)
}
- override def evaluate(buffer: Row): Double = {
+ override def doEvaluate(buffer: Row): Any = {
val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
- (bufferSum / bufferCount)
+ if (bufferCount == 0L) {
+ null
+ } else {
+ (bufferSum / bufferCount)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
index 9ad0468..b9b86d1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
@@ -25,6 +25,15 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
protected var maxIndex = -1
/**
+ * Initiate the intermediate aggregate value in Row.
+ *
+ * @param intermediate The intermediate aggregate row to initiate.
+ */
+ override def initiate(intermediate: Row): Unit = {
+ intermediate.setField(maxIndex, null)
+ }
+
+ /**
* Accessed in MapFunction, prepare the input of partial aggregate.
*
* @param value
@@ -47,9 +56,15 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
*/
override def merge(intermediate: Row, buffer: Row): Unit = {
val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
- val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
- val max: T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
- buffer.setField(maxIndex, max)
+ if (partialValue != null) {
+ val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
+ if (bufferValue != null) {
+ val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
+ buffer.setField(maxIndex, max)
+ } else {
+ buffer.setField(maxIndex, partialValue)
+ }
+ }
}
/**
@@ -73,61 +88,40 @@ class ByteMaxAggregate extends MaxAggregate[Byte] {
override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, Byte.MinValue)
- }
}
class ShortMaxAggregate extends MaxAggregate[Short] {
override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, Short.MinValue)
- }
}
class IntMaxAggregate extends MaxAggregate[Int] {
override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, Int.MinValue)
- }
}
class LongMaxAggregate extends MaxAggregate[Long] {
override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, Long.MinValue)
- }
}
class FloatMaxAggregate extends MaxAggregate[Float] {
override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, Float.MinValue)
- }
}
class DoubleMaxAggregate extends MaxAggregate[Double] {
override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, Double.MinValue)
- }
}
class BooleanMaxAggregate extends MaxAggregate[Boolean] {
override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(maxIndex, false)
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
index b607e6b..5d656f4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
@@ -20,11 +20,20 @@ package org.apache.flink.api.table.runtime.aggregate
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.table.Row
-abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{
+abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
protected var minIndex: Int = _
/**
+ * Initiate the intermediate aggregate value in Row.
+ *
+ * @param intermediate The intermediate aggregate row to initiate.
+ */
+ override def initiate(intermediate: Row): Unit = {
+ intermediate.setField(minIndex, null)
+ }
+
+ /**
* Accessed in MapFunction, prepare the input of partial aggregate.
*
* @param value
@@ -47,9 +56,15 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{
*/
override def merge(partial: Row, buffer: Row): Unit = {
val partialValue = partial.productElement(minIndex).asInstanceOf[T]
- val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
- val min: T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
- buffer.setField(minIndex, min)
+ if (partialValue != null) {
+ val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
+ if (bufferValue != null) {
+ val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
+ buffer.setField(minIndex, min)
+ } else {
+ buffer.setField(minIndex, partialValue)
+ }
+ }
}
/**
@@ -73,61 +88,40 @@ class ByteMinAggregate extends MinAggregate[Byte] {
override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, Byte.MaxValue)
- }
}
class ShortMinAggregate extends MinAggregate[Short] {
override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, Short.MaxValue)
- }
}
class IntMinAggregate extends MinAggregate[Int] {
override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, Int.MaxValue)
- }
}
class LongMinAggregate extends MinAggregate[Long] {
override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, Long.MaxValue)
- }
}
class FloatMinAggregate extends MinAggregate[Float] {
override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, Float.MaxValue)
- }
}
class DoubleMinAggregate extends MinAggregate[Double] {
override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, Double.MaxValue)
- }
}
class BooleanMinAggregate extends MinAggregate[Boolean] {
override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, true)
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
index b4c56fe..6db6632 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
@@ -27,13 +27,19 @@ abstract class SumAggregate[T: Numeric]
protected var sumIndex: Int = _
override def initiate(partial: Row): Unit = {
- partial.setField(sumIndex, numeric.zero)
+ partial.setField(sumIndex, null)
}
override def merge(partial1: Row, buffer: Row): Unit = {
val partialValue = partial1.productElement(sumIndex).asInstanceOf[T]
- val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T]
- buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
+ if (partialValue != null) {
+ val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T]
+ if (bufferValue != null) {
+ buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
+ } else {
+ buffer.setField(sumIndex, partialValue)
+ }
+ }
}
override def evaluate(buffer: Row): T = {
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
index 2575fa2..48dc313 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
@@ -56,13 +56,22 @@ abstract class AvgAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
numeric.negate(maxVal),
numeric.negate(minVal),
null.asInstanceOf[T]
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
)
)
override def expectedResults: Seq[T] = Seq(
minVal,
maxVal,
- numeric.fromInt(0)
+ numeric.fromInt(0),
+ null.asInstanceOf[T]
)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
index ce27d7c..4389a3a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
@@ -21,10 +21,11 @@ package org.apache.flink.api.table.runtime.aggregate
class CountAggregateTest extends AggregateTestBase[Long] {
override def inputValueSets: Seq[Seq[_]] = Seq(
- Seq("a", "b", null, "c", null, "d", "e", null, "f")
+ Seq("a", "b", null, "c", null, "d", "e", null, "f"),
+ Seq(null, null, null, null, null, null)
)
- override def expectedResults: Seq[Long] = Seq(6L)
+ override def expectedResults: Seq[Long] = Seq(6L, 0L)
override def aggregator: Aggregate[Long] = new CountAggregate()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
index e049e49..97385ae 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
@@ -38,10 +38,21 @@ abstract class MaxAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
numeric.fromInt(-20),
numeric.fromInt(17),
null.asInstanceOf[T]
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
)
)
- override def expectedResults: Seq[T] = Seq(maxVal)
+ override def expectedResults: Seq[T] = Seq(
+ maxVal,
+ null.asInstanceOf[T]
+ )
}
class ByteMaxAggregateTest extends MaxAggregateTestBase[Byte] {
@@ -113,10 +124,20 @@ class BooleanMaxAggregateTest extends AggregateTestBase[Boolean] {
false,
true,
null.asInstanceOf[Boolean]
+ ),
+ Seq(
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean]
)
)
- override def expectedResults: Seq[Boolean] = Seq(false, true, true)
+ override def expectedResults: Seq[Boolean] = Seq(
+ false,
+ true,
+ true,
+ null.asInstanceOf[Boolean]
+ )
override def aggregator: Aggregate[Boolean] = new BooleanMaxAggregate()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
index 7cf7bb1..cd77c10 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
@@ -38,10 +38,21 @@ abstract class MinAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
numeric.fromInt(-20),
numeric.fromInt(17),
null.asInstanceOf[T]
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
)
)
- override def expectedResults: Seq[T] = Seq(minVal)
+ override def expectedResults: Seq[T] = Seq(
+ minVal,
+ null.asInstanceOf[T]
+ )
}
class ByteMinAggregateTest extends MinAggregateTestBase[Byte] {
@@ -113,10 +124,20 @@ class BooleanMinAggregateTest extends AggregateTestBase[Boolean] {
false,
true,
null.asInstanceOf[Boolean]
+ ),
+ Seq(
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean],
+ null.asInstanceOf[Boolean]
)
)
- override def expectedResults: Seq[Boolean] = Seq(false, true, false)
+ override def expectedResults: Seq[Boolean] = Seq(
+ false,
+ true,
+ false,
+ null.asInstanceOf[Boolean]
+ )
override def aggregator: Aggregate[Boolean] = new BooleanMinAggregate()
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
index f5de3fc..fb6fc39 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
@@ -39,11 +39,21 @@ abstract class SumAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
numeric.fromInt(17),
null.asInstanceOf[T],
maxVal
+ ),
+ Seq(
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T],
+ null.asInstanceOf[T]
)
)
- override def expectedResults: Seq[T] = Seq(numeric.fromInt(2))
-
+ override def expectedResults: Seq[T] = Seq(
+ numeric.fromInt(2),
+ null.asInstanceOf[T]
+ )
}
class ByteSumAggregateTest extends SumAggregateTestBase[Byte] {