You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/04 04:01:53 UTC
[flink] branch release-1.11 updated:
[FLINK-16451][table-planner-blink] Fix IndexOutOfBoundsException for
DISTINCT AGG with constants
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 492a2e2 [FLINK-16451][table-planner-blink] Fix IndexOutOfBoundsException for DISTINCT AGG with constants
492a2e2 is described below
commit 492a2e22b02d9032ef6e9ff80f798458ef16211b
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 4 11:57:58 2020 +0800
[FLINK-16451][table-planner-blink] Fix IndexOutOfBoundsException for DISTINCT AGG with constants
This closes #12432
---
.../codegen/agg/AggsHandlerCodeGenerator.scala | 1 +
.../planner/codegen/agg/DistinctAggCodeGen.scala | 25 ++++++++++++++++------
.../runtime/stream/sql/OverWindowITCase.scala | 8 +++++--
3 files changed, 25 insertions(+), 9 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index 5fd7112..c89a8d6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -259,6 +259,7 @@ class AggsHandlerCodeGenerator(
index,
innerCodeGens,
filterExpr.toArray,
+ constantExprs,
mergedAccOffset,
aggBufferOffset,
aggBufferSize,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
index 6e04dad..ea319b3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
import org.apache.flink.table.planner.plan.utils.DistinctInfo
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
import org.apache.flink.table.types.logical.{LogicalType, RowType}
import org.apache.flink.util.Preconditions
import org.apache.flink.util.Preconditions.checkArgument
@@ -65,6 +66,7 @@ class DistinctAggCodeGen(
distinctIndex: Int,
innerAggCodeGens: Array[AggCodeGen],
filterExpressions: Array[Option[Expression]],
+ constantExpressions: Seq[GeneratedExpression],
mergedAccOffset: Int,
aggBufferOffset: Int,
aggBufferSize: Int,
@@ -373,13 +375,22 @@ class DistinctAggCodeGen(
private def generateKeyExpression(
ctx: CodeGeneratorContext,
generator: ExprCodeGenerator): GeneratedExpression = {
- val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
- ctx,
- generator.input1Type,
- generator.input1Term,
- _,
- nullableInput = false,
- deepCopy = inputFieldCopy))
+ val fieldExprs = distinctInfo.argIndexes.map(argIndex => {
+ val inputFieldCount = LogicalTypeChecks.getFieldCount(generator.input1Type)
+ if (argIndex >= inputFieldCount) {
+ // arg index to constant
+ constantExpressions(argIndex - inputFieldCount)
+ } else {
+ // arg index to input field
+ generateInputAccess(
+ ctx,
+ generator.input1Type,
+ generator.input1Term,
+ argIndex,
+ nullableInput = false,
+ deepCopy = inputFieldCopy)
+ }
+ })
// the key expression of MapView
if (fieldExprs.length > 1) {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
index b1f5bf8..2fe7e2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
@@ -281,14 +281,18 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+ "listagg(distinct c, '|') " +
+ " OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW), " +
+ "count(a) " +
+ " OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
"from T1"
val sink = new TestingAppendSink
tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
env.execute()
- val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+ val expected = List("Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6",
+ "Hello|Hello World,7", "Hello|Hello World,8", "Hello|Hello World,9")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}