You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/17 16:29:26 UTC
[flink] branch master updated: [FLINK-17649][table-planner-blink]
Fix hash aggregate NPE
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cc8bf34 [FLINK-17649][table-planner-blink] Fix hash aggregate NPE
cc8bf34 is described below
commit cc8bf3426a0d43560d3e55337ceab48f80db43f7
Author: Shuo Cheng <mf...@smail.nju.edu.cn>
AuthorDate: Mon May 18 00:28:48 2020 +0800
[FLINK-17649][table-planner-blink] Fix hash aggregate NPE
Generated hash aggregate code may produce NPE in some cases existing an aggregate call with Filter.
This closes #12119
---
.../codegen/agg/batch/HashAggCodeGenHelper.scala | 6 +++++-
.../batch/sql/agg/AggregateITCaseBase.scala | 23 +++++++++++++++++++++-
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 59f663b..abc5791 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -465,8 +465,12 @@ object HashAggCodeGenHelper {
|""".stripMargin.trim
if (filterArg >= 0) {
+ var filterTerm = s"$inputTerm.getBoolean($filterArg)"
+ if (ctx.nullCheck) {
+ filterTerm = s"!$inputTerm.isNullAt($filterArg) && " + filterTerm
+ }
s"""
- |if ($inputTerm.getBoolean($filterArg)) {
+ |if ($filterTerm) {
| $innerCode
|}
""".stripMargin
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
index 9d9a284..eeb3188 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala
@@ -27,7 +27,6 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData._
import org.apache.flink.types.Row
-
import org.junit.{Before, Test}
import scala.collection.Seq
@@ -123,6 +122,28 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase {
}
@Test
+ def testSimpleAndDistinctAggWithCommonFilter(): Unit = {
+ val sql =
+ """
+ |SELECT
+ | h,
+ | COUNT(1) FILTER(WHERE d > 1),
+ | COUNT(1) FILTER(WHERE d < 2),
+ | COUNT(DISTINCT e) FILTER(WHERE d > 1)
+ |FROM Table5
+ |GROUP BY h
+ |""".stripMargin
+ checkResult(
+ sql,
+ Seq(
+ row(1,0,1,4),
+ row(2,0,0,7),
+ row(3,0,0,3)
+ )
+ )
+ }
+
+ @Test
def testTwoPhasesAggregation(): Unit = {
checkResult(
"SELECT sum(d), avg(d), count(g), min(e), h FROM Table5 GROUP BY h",