You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/17 16:29:26 UTC

[flink] branch master updated: [FLINK-17649][table-planner-blink] Fix hash aggregate NPE

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc8bf34  [FLINK-17649][table-planner-blink] Fix hash aggregate NPE
cc8bf34 is described below

commit cc8bf3426a0d43560d3e55337ceab48f80db43f7
Author: Shuo Cheng <mf...@smail.nju.edu.cn>
AuthorDate: Mon May 18 00:28:48 2020 +0800

    [FLINK-17649][table-planner-blink] Fix hash aggregate NPE
    
    Generated hash aggregate code may produce NPE in some cases existing an aggregate call with Filter.
    
    This closes #12119
---
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  6 +++++-
 .../batch/sql/agg/AggregateITCaseBase.scala        | 23 +++++++++++++++++++++-
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 59f663b..abc5791 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -465,8 +465,12 @@ object HashAggCodeGenHelper {
              |""".stripMargin.trim
 
         if (filterArg >= 0) {
+          var filterTerm = s"$inputTerm.getBoolean($filterArg)"
+          if (ctx.nullCheck) {
+            filterTerm = s"!$inputTerm.isNullAt($filterArg) && " + filterTerm
+          }
           s"""
-             |if ($inputTerm.getBoolean($filterArg)) {
+             |if ($filterTerm) {
              | $innerCode
              |}
           """.stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
index 9d9a284..eeb3188 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
@@ -27,7 +27,6 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Test}
 
 import scala.collection.Seq
@@ -123,6 +122,28 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
   }
 
   @Test
+  def testSimpleAndDistinctAggWithCommonFilter(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |   h,
+        |   COUNT(1) FILTER(WHERE d > 1),
+        |   COUNT(1) FILTER(WHERE d < 2),
+        |   COUNT(DISTINCT e) FILTER(WHERE d > 1)
+        |FROM Table5
+        |GROUP BY h
+        |""".stripMargin
+    checkResult(
+      sql,
+      Seq(
+        row(1,0,1,4),
+        row(2,0,0,7),
+        row(3,0,0,3)
+      )
+    )
+  }
+
+  @Test
   def testTwoPhasesAggregation(): Unit = {
     checkResult(
       "SELECT sum(d), avg(d), count(g), min(e), h FROM Table5 GROUP BY h",