You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/11 08:33:17 UTC

[flink] branch release-1.11 updated: [FLINK-18862][table-planner-blink] Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception during runtime

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new a72a8cd  [FLINK-18862][table-planner-blink] Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception during runtime
a72a8cd is described below

commit a72a8cd430a00c51ba55d7d9eed3e39dcbac3164
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Aug 11 16:32:41 2020 +0800

    [FLINK-18862][table-planner-blink] Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception during runtime
    
    This closes #13112
---
 .../ListAggWithRetractAggFunction.java             | 27 ++++++++++++
 .../ListAggWsWithRetractAggFunction.java           | 30 +++++++++++++
 .../functions/utils/UserDefinedFunctionUtils.scala |  1 -
 .../runtime/stream/sql/AggregateITCase.scala       | 49 ++++++++++++++++++++++
 4 files changed, 106 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
index 4f90054..6b204ae 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
@@ -19,13 +19,18 @@
 package org.apache.flink.table.planner.functions.aggfunctions;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.dataview.ListViewTypeInfo;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.runtime.typeutils.StringDataTypeInfo;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -63,6 +68,28 @@ public final class ListAggWithRetractAggFunction
 	}
 
 	@Override
+	public TypeInformation<StringData> getResultType() {
+		return StringDataTypeInfo.INSTANCE;
+	}
+
+	@Override
+	public TypeInformation<ListAggWithRetractAccumulator> getAccumulatorType() {
+		try {
+			Class<ListAggWithRetractAccumulator> clazz = ListAggWithRetractAccumulator.class;
+			List<PojoField> pojoFields = new ArrayList<>();
+			pojoFields.add(new PojoField(
+				clazz.getDeclaredField("list"),
+				new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+			pojoFields.add(new PojoField(
+				clazz.getDeclaredField("retractList"),
+				new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+			return new PojoTypeInfo<>(clazz, pojoFields);
+		} catch (NoSuchFieldException e) {
+			throw new WrappingRuntimeException(e);
+		}
+	}
+
+	@Override
 	public ListAggWithRetractAccumulator createAccumulator() {
 		return new ListAggWithRetractAccumulator();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
index a7908a9..cdbbf2b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
@@ -19,13 +19,18 @@
 package org.apache.flink.table.planner.functions.aggfunctions;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.dataview.ListViewTypeInfo;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.runtime.typeutils.StringDataTypeInfo;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -64,6 +69,31 @@ public final class ListAggWsWithRetractAggFunction
 	}
 
 	@Override
+	public TypeInformation<StringData> getResultType() {
+		return StringDataTypeInfo.INSTANCE;
+	}
+
+	@Override
+	public TypeInformation<ListAggWsWithRetractAccumulator> getAccumulatorType() {
+		try {
+			Class<ListAggWsWithRetractAccumulator> clazz = ListAggWsWithRetractAccumulator.class;
+			List<PojoField> pojoFields = new ArrayList<>();
+			pojoFields.add(new PojoField(
+				clazz.getDeclaredField("list"),
+				new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+			pojoFields.add(new PojoField(
+				clazz.getDeclaredField("retractList"),
+				new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+			pojoFields.add(new PojoField(
+				clazz.getDeclaredField("delimiter"),
+				StringDataTypeInfo.INSTANCE));
+			return new PojoTypeInfo<>(clazz, pojoFields);
+		} catch (NoSuchFieldException e) {
+			throw new WrappingRuntimeException(e);
+		}
+	}
+
+	@Override
 	public ListAggWsWithRetractAccumulator createAccumulator() {
 		return new ListAggWsWithRetractAccumulator();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index 5f099a8..471a39b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -41,7 +41,6 @@ import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataTy
 import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
 import org.apache.flink.util.InstantiationUtil
-
 import com.google.common.primitives.Primitives
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.rex.{RexLiteral, RexNode}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index d3c6b77..0808b47 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -547,6 +547,55 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+
+  @Test
+  def testListAggWithRetraction(): Unit = {
+    env.setParallelism(1) // we have to use parallelism=1 to make sure the result is deterministic
+    val dataWithNull = List(
+      ("1", "a"),
+      ("1", "b"),
+      ("1", null),
+      ("1", "a"))
+
+    val t: DataStream[(String, String)] = failingDataSource(dataWithNull)
+    val streamTable = t.toTable(tEnv, 'x, 'y)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.executeSql(
+      """
+        |CREATE VIEW view1 AS
+        |SELECT
+        |    x,
+        |    y,
+        |    CAST(COUNT(1) AS VARCHAR) AS ct
+        |FROM T
+        |GROUP BY
+        |    x, y
+        |""".stripMargin)
+
+    // | x | concat_ws  |
+    // |---|------------|
+    // | 1 | a=2        |
+    // | 1 | b=1        |
+    // | 1 | 1          |
+    val sqlQuery =
+      s"""
+         |select
+         |     x,
+         |     '[' || LISTAGG(CONCAT_WS('=', y, ct), ';') || ']' AS list1,
+         |     '[' || LISTAGG(CONCAT_WS('=', y, ct)) || ']' AS list2
+         |FROM view1
+         |GROUP BY x
+       """.stripMargin
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List("1,[b=1;1;a=2],[b=1,1,a=2]")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
   @Test
   def testListAggWithNullData(): Unit = {
     val dataWithNull = List(