You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/07 09:35:12 UTC
[flink] branch master updated: [FLINK-10845] [table] Support
multiple different DISTINCT aggregates for batch
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0a37f8 [FLINK-10845] [table] Support multiple different DISTINCT aggregates for batch
c0a37f8 is described below
commit c0a37f831eeee600cba0f3aa5bc9fe7414667144
Author: xueyu <27...@qq.com>
AuthorDate: Fri Nov 23 23:10:28 2018 +0800
[FLINK-10845] [table] Support multiple different DISTINCT aggregates for batch
This closes #7079.
---
.../apache/flink/table/codegen/CodeGenerator.scala | 5 +++
.../table/codegen/calls/ScalarOperators.scala | 40 +++++++++++++++++++++-
.../table/expressions/ScalarOperatorsTest.scala | 4 +++
.../table/runtime/batch/sql/AggregateITCase.scala | 37 ++++++++++++++++++++
4 files changed, 85 insertions(+), 1 deletion(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 6c05ff0..5439937 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -838,6 +838,11 @@ abstract class CodeGenerator(
val right = operands(1)
generateEquals(nullCheck, left, right)
+ case IS_NOT_DISTINCT_FROM =>
+ val left = operands.head
+ val right = operands(1)
+ generateIsNotDistinctFrom(nullCheck, left, right);
+
case NOT_EQUALS =>
val left = operands.head
val right = operands(1)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 5a185da..a9cd315 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -191,7 +191,45 @@ object ScalarOperators {
)
)
}
- }
+ }
+
+ def generateIsNotDistinctFrom(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+
+ if (nullCheck) {
+ val resultTerm = newName("result")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(BOOLEAN_TYPE_INFO)
+ val equalExpression = generateEquals(
+ nullCheck = false,
+ left.copy(code = GeneratedExpression.NO_CODE),
+ right.copy(code = GeneratedExpression.NO_CODE))
+
+ val resultCode =
+ s"""
+ |${left.code}
+ |${right.code}
+ |$resultTypeTerm $resultTerm;
+ |if (${left.nullTerm}) {
+ | $resultTerm = ${right.nullTerm};
+ |} else if (${right.nullTerm}) {
+ | $resultTerm = ${left.nullTerm};
+ |} else {
+ | ${equalExpression.code}
+ | $resultTerm = ${equalExpression.resultTerm};
+ |}
+ |""".stripMargin
+
+ GeneratedExpression(
+ resultTerm,
+ GeneratedExpression.NEVER_NULL,
+ resultCode, BOOLEAN_TYPE_INFO)
+ } else {
+ generateEquals(nullCheck = false, left, right)
+ }
+ }
def generateEquals(
nullCheck: Boolean,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index de4c804..d61627b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -406,6 +406,10 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
"((((true) === true) || false).cast(STRING) + 'X ').trim",
"trueX")
testTableApi(12.isNull, "12.isNull", "false")
+
+ testSqlApi("f12 IS NOT DISTINCT FROM NULL", "true")
+ testSqlApi("f9 IS NOT DISTINCT FROM NULL", "false")
+ testSqlApi("f9 IS NOT DISTINCT FROM 10", "true")
}
@Test
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index 09ccfc4..fff252f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -559,4 +559,41 @@ class AggregateITCase(
TestBaseUtils.compareResultAsText(result.asJava, expected)
}
+
+ @Test
+ def testMultipleDistinctWithDiffParams(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlWithNull = "SELECT a, " +
+ " CASE WHEN b = 2 THEN null ELSE b END AS b, " +
+ " c FROM MyTable"
+
+ val sqlQuery =
+ "SELECT b, " +
+ " COUNT(DISTINCT b), " +
+ " SUM(DISTINCT (a / 3)), " +
+ " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
+ " COUNT(DISTINCT c) " +
+ "FROM (" +
+ sqlWithNull +
+ ") GROUP BY b " +
+ "ORDER BY b"
+
+ val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
+
+ val expected = Seq(
+ "1,1,0,1,1",
+ "3,1,3,3,3",
+ "4,1,5,1,4",
+ "5,1,12,1,5",
+ "6,1,18,1,6",
+ "null,0,1,1,2"
+ ).mkString("\n")
+
+ TestBaseUtils.compareResultAsText(result.asJava, expected)
+ }
}