You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/06/14 15:58:46 UTC

[2/2] flink git commit: [FLINK-3971] [tableAPI] Fix handling of null values in aggregations.

[FLINK-3971] [tableAPI] Fix handling of null values in aggregations.

This closes #2049


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdf43609
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdf43609
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdf43609

Branch: refs/heads/master
Commit: fdf43609977c807deb3bb81bf1095efb721fe688
Parents: e0b9e8d
Author: gallenvara <ga...@126.com>
Authored: Mon May 30 18:30:07 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jun 14 15:05:32 2016 +0200

----------------------------------------------------------------------
 .../table/runtime/aggregate/AvgAggregate.scala  | 60 ++++++++++++++++----
 .../table/runtime/aggregate/MaxAggregate.scala  | 42 ++++++--------
 .../table/runtime/aggregate/MinAggregate.scala  | 44 +++++++-------
 .../table/runtime/aggregate/SumAggregate.scala  | 12 +++-
 .../runtime/aggregate/AvgAggregateTest.scala    | 11 +++-
 .../runtime/aggregate/CountAggregateTest.scala  |  5 +-
 .../runtime/aggregate/MaxAggregateTest.scala    | 25 +++++++-
 .../runtime/aggregate/MinAggregateTest.scala    | 25 +++++++-
 .../runtime/aggregate/SumAggregateTest.scala    | 14 ++++-
 9 files changed, 165 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
index 8cf181a..e724648 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
@@ -59,11 +59,17 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
   }
 
+  override def evaluate(buffer : Row): T = {
+    doEvaluate(buffer).asInstanceOf[T]
+  }
+
   override def intermediateDataType = Array(
     BasicTypeInfo.LONG_TYPE_INFO,
     BasicTypeInfo.LONG_TYPE_INFO)
 
   def doPrepare(value: Any, partial: Row): Unit
+
+  def doEvaluate(buffer: Row): Any
 }
 
 class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
@@ -73,10 +79,14 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
     partial.setField(partialCountIndex, 1L)
   }
 
-  override def evaluate(buffer: Row): Byte = {
+  override def doEvaluate(buffer: Row): Any = {
     val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
     val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
-    (bufferSum / bufferCount).toByte
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toByte
+    }
   }
 }
 
@@ -88,10 +98,14 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
     partial.setField(partialCountIndex, 1L)
   }
 
-  override def evaluate(buffer: Row): Short = {
+  override def doEvaluate(buffer: Row): Any = {
     val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
     val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
-    (bufferSum / bufferCount).toShort
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toShort
+    }
   }
 }
 
@@ -103,10 +117,14 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] {
     partial.setField(partialCountIndex, 1L)
   }
 
-  override def evaluate(buffer: Row): Int = {
+  override def doEvaluate(buffer: Row): Any = {
     val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
     val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
-    (bufferSum / bufferCount).toInt
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toInt
+    }
   }
 }
 
@@ -145,10 +163,14 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] {
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
   }
 
-  override def evaluate(buffer: Row): Long = {
+  override def doEvaluate(buffer: Row): Any = {
     val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
     val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
-    bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
+    if (bufferCount == 0L) {
+      null
+    } else {
+      bufferSum.divide(BigInteger.valueOf(bufferCount)).longValue()
+    }
   }
 }
 
@@ -178,11 +200,17 @@ abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
     buffer.setField(partialCountIndex, partialCount + bufferCount)
   }
 
+  override def evaluate(buffer : Row): T = {
+    doEvaluate(buffer).asInstanceOf[T]
+  }
+
   override def intermediateDataType = Array(
     BasicTypeInfo.DOUBLE_TYPE_INFO,
     BasicTypeInfo.LONG_TYPE_INFO)
 
   def doPrepare(value: Any, partial: Row): Unit
+
+  def doEvaluate(buffer: Row): Any
 }
 
 class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
@@ -194,10 +222,14 @@ class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
   }
 
 
-  override def evaluate(buffer: Row): Float = {
+  override def doEvaluate(buffer: Row): Any = {
     val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
     val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
-    (bufferSum / bufferCount).toFloat
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount).toFloat
+    }
   }
 }
 
@@ -209,9 +241,13 @@ class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
     partial.setField(partialCountIndex, 1L)
   }
 
-  override def evaluate(buffer: Row): Double = {
+  override def doEvaluate(buffer: Row): Any = {
     val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
     val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
-    (bufferSum / bufferCount)
+    if (bufferCount == 0L) {
+      null
+    } else {
+      (bufferSum / bufferCount)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
index 9ad0468..b9b86d1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
@@ -25,6 +25,15 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
   protected var maxIndex = -1
 
   /**
+   * Initiate the intermediate aggregate value in Row.
+   *
+   * @param intermediate The intermediate aggregate row to initiate.
+   */
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(maxIndex, null)
+  }
+
+  /**
    * Accessed in MapFunction, prepare the input of partial aggregate.
    *
    * @param value
@@ -47,9 +56,15 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
    */
   override def merge(intermediate: Row, buffer: Row): Unit = {
     val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
-    val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
-    val max: T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
-    buffer.setField(maxIndex, max)
+    if (partialValue != null) {
+      val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
+      if (bufferValue != null) {
+        val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
+        buffer.setField(maxIndex, max)
+      } else {
+        buffer.setField(maxIndex, partialValue)
+      }
+    }
   }
 
   /**
@@ -73,61 +88,40 @@ class ByteMaxAggregate extends MaxAggregate[Byte] {
 
   override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, Byte.MinValue)
-  }
 }
 
 class ShortMaxAggregate extends MaxAggregate[Short] {
 
   override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, Short.MinValue)
-  }
 }
 
 class IntMaxAggregate extends MaxAggregate[Int] {
 
   override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, Int.MinValue)
-  }
 }
 
 class LongMaxAggregate extends MaxAggregate[Long] {
 
   override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, Long.MinValue)
-  }
 }
 
 class FloatMaxAggregate extends MaxAggregate[Float] {
 
   override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, Float.MinValue)
-  }
 }
 
 class DoubleMaxAggregate extends MaxAggregate[Double] {
 
   override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, Double.MinValue)
-  }
 }
 
 class BooleanMaxAggregate extends MaxAggregate[Boolean] {
 
   override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(maxIndex, false)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
index b607e6b..5d656f4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
@@ -20,11 +20,20 @@ package org.apache.flink.api.table.runtime.aggregate
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.table.Row
 
-abstract  class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{
+abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
 
   protected var minIndex: Int = _
 
   /**
+   * Initiate the intermediate aggregate value in Row.
+   *
+   * @param intermediate The intermediate aggregate row to initiate.
+   */
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(minIndex, null)
+  }
+
+  /**
    * Accessed in MapFunction, prepare the input of partial aggregate.
    *
    * @param value
@@ -47,9 +56,15 @@ abstract  class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{
    */
   override def merge(partial: Row, buffer: Row): Unit = {
     val partialValue = partial.productElement(minIndex).asInstanceOf[T]
-    val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
-    val min: T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
-    buffer.setField(minIndex, min)
+    if (partialValue != null) {
+      val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
+      if (bufferValue != null) {
+        val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
+        buffer.setField(minIndex, min)
+      } else {
+        buffer.setField(minIndex, partialValue)
+      }
+    }
   }
 
   /**
@@ -73,61 +88,40 @@ class ByteMinAggregate extends MinAggregate[Byte] {
 
   override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, Byte.MaxValue)
-  }
 }
 
 class ShortMinAggregate extends MinAggregate[Short] {
 
   override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, Short.MaxValue)
-  }
 }
 
 class IntMinAggregate extends MinAggregate[Int] {
 
   override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, Int.MaxValue)
-  }
 }
 
 class LongMinAggregate extends MinAggregate[Long] {
 
   override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, Long.MaxValue)
-  }
 }
 
 class FloatMinAggregate extends MinAggregate[Float] {
 
   override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, Float.MaxValue)
-  }
 }
 
 class DoubleMinAggregate extends MinAggregate[Double] {
 
   override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, Double.MaxValue)
-  }
 }
 
 class BooleanMinAggregate extends MinAggregate[Boolean] {
 
   override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
 
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, true)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
index b4c56fe..6db6632 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
@@ -27,13 +27,19 @@ abstract class SumAggregate[T: Numeric]
   protected var sumIndex: Int = _
 
   override def initiate(partial: Row): Unit = {
-    partial.setField(sumIndex, numeric.zero)
+    partial.setField(sumIndex, null)
   }
 
   override def merge(partial1: Row, buffer: Row): Unit = {
     val partialValue = partial1.productElement(sumIndex).asInstanceOf[T]
-    val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T]
-    buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
+    if (partialValue != null) {
+      val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T]
+      if (bufferValue != null) {
+        buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
+      } else {
+        buffer.setField(sumIndex, partialValue)
+      }
+    }
   }
 
   override def evaluate(buffer: Row): T = {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
index 2575fa2..48dc313 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregateTest.scala
@@ -56,13 +56,22 @@ abstract class AvgAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
       numeric.negate(maxVal),
       numeric.negate(minVal),
       null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
     )
   )
 
   override def expectedResults: Seq[T] = Seq(
     minVal,
     maxVal,
-    numeric.fromInt(0)
+    numeric.fromInt(0),
+    null.asInstanceOf[T]
   )
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
index ce27d7c..4389a3a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregateTest.scala
@@ -21,10 +21,11 @@ package org.apache.flink.api.table.runtime.aggregate
 class CountAggregateTest extends AggregateTestBase[Long] {
 
   override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq("a", "b", null, "c", null, "d", "e", null, "f")
+    Seq("a", "b", null, "c", null, "d", "e", null, "f"),
+    Seq(null, null, null, null, null, null)
   )
 
-  override def expectedResults: Seq[Long] = Seq(6L)
+  override def expectedResults: Seq[Long] = Seq(6L, 0L)
 
   override def aggregator: Aggregate[Long] = new CountAggregate()
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
index e049e49..97385ae 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregateTest.scala
@@ -38,10 +38,21 @@ abstract class MaxAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
       numeric.fromInt(-20),
       numeric.fromInt(17),
       null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
     )
   )
 
-  override def expectedResults: Seq[T] = Seq(maxVal)
+  override def expectedResults: Seq[T] = Seq(
+    maxVal,
+    null.asInstanceOf[T]
+  )
 }
 
 class ByteMaxAggregateTest extends MaxAggregateTestBase[Byte] {
@@ -113,10 +124,20 @@ class BooleanMaxAggregateTest extends AggregateTestBase[Boolean] {
       false,
       true,
       null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
     )
   )
 
-  override def expectedResults: Seq[Boolean] = Seq(false, true, true)
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    true,
+    null.asInstanceOf[Boolean]
+  )
 
   override def aggregator: Aggregate[Boolean] = new BooleanMaxAggregate()
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
index 7cf7bb1..cd77c10 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregateTest.scala
@@ -38,10 +38,21 @@ abstract class MinAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
       numeric.fromInt(-20),
       numeric.fromInt(17),
       null.asInstanceOf[T]
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
     )
   )
 
-  override def expectedResults: Seq[T] = Seq(minVal)
+  override def expectedResults: Seq[T] = Seq(
+    minVal,
+    null.asInstanceOf[T]
+  )
 }
 
 class ByteMinAggregateTest extends MinAggregateTestBase[Byte] {
@@ -113,10 +124,20 @@ class BooleanMinAggregateTest extends AggregateTestBase[Boolean] {
       false,
       true,
       null.asInstanceOf[Boolean]
+    ),
+    Seq(
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean],
+      null.asInstanceOf[Boolean]
     )
   )
 
-  override def expectedResults: Seq[Boolean] = Seq(false, true, false)
+  override def expectedResults: Seq[Boolean] = Seq(
+    false,
+    true,
+    false,
+    null.asInstanceOf[Boolean]
+  )
 
   override def aggregator: Aggregate[Boolean] = new BooleanMinAggregate()
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fdf43609/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
index f5de3fc..fb6fc39 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregateTest.scala
@@ -39,11 +39,21 @@ abstract class SumAggregateTestBase[T: Numeric] extends AggregateTestBase[T] {
       numeric.fromInt(17),
       null.asInstanceOf[T],
       maxVal
+    ),
+    Seq(
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T],
+      null.asInstanceOf[T]
     )
   )
 
-  override def expectedResults: Seq[T] = Seq(numeric.fromInt(2))
-
+  override def expectedResults: Seq[T] = Seq(
+    numeric.fromInt(2),
+    null.asInstanceOf[T]
+  )
 }
 
 class ByteSumAggregateTest extends SumAggregateTestBase[Byte] {