You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/04 04:01:53 UTC

[flink] branch release-1.11 updated: [FLINK-16451][table-planner-blink] Fix IndexOutOfBoundsException for DISTINCT AGG with constants

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 492a2e2  [FLINK-16451][table-planner-blink] Fix IndexOutOfBoundsException for DISTINCT AGG with constants
492a2e2 is described below

commit 492a2e22b02d9032ef6e9ff80f798458ef16211b
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 4 11:57:58 2020 +0800

    [FLINK-16451][table-planner-blink] Fix IndexOutOfBoundsException for DISTINCT AGG with constants
    
    This closes #12432
---
 .../codegen/agg/AggsHandlerCodeGenerator.scala     |  1 +
 .../planner/codegen/agg/DistinctAggCodeGen.scala   | 25 ++++++++++++++++------
 .../runtime/stream/sql/OverWindowITCase.scala      |  8 +++++--
 3 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
index 5fd7112..c89a8d6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -259,6 +259,7 @@ class AggsHandlerCodeGenerator(
           index,
           innerCodeGens,
           filterExpr.toArray,
+          constantExprs,
           mergedAccOffset,
           aggBufferOffset,
           aggBufferSize,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
index 6e04dad..ea319b3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import org.apache.flink.table.planner.plan.utils.DistinctInfo
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.table.types.logical.{LogicalType, RowType}
 import org.apache.flink.util.Preconditions
 import org.apache.flink.util.Preconditions.checkArgument
@@ -65,6 +66,7 @@ class DistinctAggCodeGen(
   distinctIndex: Int,
   innerAggCodeGens: Array[AggCodeGen],
   filterExpressions: Array[Option[Expression]],
+  constantExpressions: Seq[GeneratedExpression],
   mergedAccOffset: Int,
   aggBufferOffset: Int,
   aggBufferSize: Int,
@@ -373,13 +375,22 @@ class DistinctAggCodeGen(
   private def generateKeyExpression(
       ctx: CodeGeneratorContext,
       generator: ExprCodeGenerator): GeneratedExpression = {
-    val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
-      ctx,
-      generator.input1Type,
-      generator.input1Term,
-      _,
-      nullableInput = false,
-      deepCopy = inputFieldCopy))
+    val fieldExprs = distinctInfo.argIndexes.map(argIndex => {
+      val inputFieldCount = LogicalTypeChecks.getFieldCount(generator.input1Type)
+      if (argIndex >= inputFieldCount) {
+        // arg index to constant
+        constantExpressions(argIndex - inputFieldCount)
+      } else {
+        // arg index to input field
+        generateInputAccess(
+          ctx,
+          generator.input1Type,
+          generator.input1Term,
+          argIndex,
+          nullableInput = false,
+          deepCopy = inputFieldCopy)
+      }
+    })
 
     // the key expression of MapView
     if (fieldExprs.length > 1) {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
index b1f5bf8..2fe7e2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
@@ -281,14 +281,18 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     tEnv.registerTable("T1", t1)
 
     val sqlQuery = "SELECT " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+      "listagg(distinct c, '|') " +
+      "  OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW), " +
+      "count(a) " +
+      "  OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
       "from T1"
 
     val sink = new TestingAppendSink
     tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
     env.execute()
 
-    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+    val expected = List("Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6",
+      "Hello|Hello World,7", "Hello|Hello World,8", "Hello|Hello World,9")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }