You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/11 08:33:17 UTC
[flink] branch release-1.11 updated:
[FLINK-18862][table-planner-blink] Fix LISTAGG throws BinaryRawValueData
cannot be cast to StringData exception during runtime
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new a72a8cd [FLINK-18862][table-planner-blink] Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception during runtime
a72a8cd is described below
commit a72a8cd430a00c51ba55d7d9eed3e39dcbac3164
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Aug 11 16:32:41 2020 +0800
[FLINK-18862][table-planner-blink] Fix LISTAGG throws BinaryRawValueData cannot be cast to StringData exception during runtime
This closes #13112
---
.../ListAggWithRetractAggFunction.java | 27 ++++++++++++
.../ListAggWsWithRetractAggFunction.java | 30 +++++++++++++
.../functions/utils/UserDefinedFunctionUtils.scala | 1 -
.../runtime/stream/sql/AggregateITCase.scala | 49 ++++++++++++++++++++++
4 files changed, 106 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
index 4f90054..6b204ae 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
@@ -19,13 +19,18 @@
package org.apache.flink.table.planner.functions.aggfunctions;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.table.api.dataview.ListView;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.dataview.ListViewTypeInfo;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.runtime.typeutils.StringDataTypeInfo;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.WrappingRuntimeException;
import java.util.ArrayList;
import java.util.List;
@@ -63,6 +68,28 @@ public final class ListAggWithRetractAggFunction
}
@Override
+ public TypeInformation<StringData> getResultType() {
+ return StringDataTypeInfo.INSTANCE;
+ }
+
+ @Override
+ public TypeInformation<ListAggWithRetractAccumulator> getAccumulatorType() {
+ try {
+ Class<ListAggWithRetractAccumulator> clazz = ListAggWithRetractAccumulator.class;
+ List<PojoField> pojoFields = new ArrayList<>();
+ pojoFields.add(new PojoField(
+ clazz.getDeclaredField("list"),
+ new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+ pojoFields.add(new PojoField(
+ clazz.getDeclaredField("retractList"),
+ new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+ return new PojoTypeInfo<>(clazz, pojoFields);
+ } catch (NoSuchFieldException e) {
+ throw new WrappingRuntimeException(e);
+ }
+ }
+
+ @Override
public ListAggWithRetractAccumulator createAccumulator() {
return new ListAggWithRetractAccumulator();
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
index a7908a9..cdbbf2b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
@@ -19,13 +19,18 @@
package org.apache.flink.table.planner.functions.aggfunctions;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.table.api.dataview.ListView;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.dataview.ListViewTypeInfo;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.runtime.typeutils.StringDataTypeInfo;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.WrappingRuntimeException;
import java.util.ArrayList;
import java.util.List;
@@ -64,6 +69,31 @@ public final class ListAggWsWithRetractAggFunction
}
@Override
+ public TypeInformation<StringData> getResultType() {
+ return StringDataTypeInfo.INSTANCE;
+ }
+
+ @Override
+ public TypeInformation<ListAggWsWithRetractAccumulator> getAccumulatorType() {
+ try {
+ Class<ListAggWsWithRetractAccumulator> clazz = ListAggWsWithRetractAccumulator.class;
+ List<PojoField> pojoFields = new ArrayList<>();
+ pojoFields.add(new PojoField(
+ clazz.getDeclaredField("list"),
+ new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+ pojoFields.add(new PojoField(
+ clazz.getDeclaredField("retractList"),
+ new ListViewTypeInfo<>(StringDataTypeInfo.INSTANCE)));
+ pojoFields.add(new PojoField(
+ clazz.getDeclaredField("delimiter"),
+ StringDataTypeInfo.INSTANCE));
+ return new PojoTypeInfo<>(clazz, pojoFields);
+ } catch (NoSuchFieldException e) {
+ throw new WrappingRuntimeException(e);
+ }
+ }
+
+ @Override
public ListAggWsWithRetractAccumulator createAccumulator() {
return new ListAggWsWithRetractAccumulator();
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index 5f099a8..471a39b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -41,7 +41,6 @@ import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataTy
import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.flink.types.Row
import org.apache.flink.util.InstantiationUtil
-
import com.google.common.primitives.Primitives
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.rex.{RexLiteral, RexNode}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index d3c6b77..0808b47 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -547,6 +547,55 @@ class AggregateITCase(
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
+
+ @Test
+ def testListAggWithRetraction(): Unit = {
+ env.setParallelism(1) // we have to use parallelism=1 to make sure the result is deterministic
+ val dataWithNull = List(
+ ("1", "a"),
+ ("1", "b"),
+ ("1", null),
+ ("1", "a"))
+
+ val t: DataStream[(String, String)] = failingDataSource(dataWithNull)
+ val streamTable = t.toTable(tEnv, 'x, 'y)
+ tEnv.registerTable("T", streamTable)
+
+ tEnv.executeSql(
+ """
+ |CREATE VIEW view1 AS
+ |SELECT
+ | x,
+ | y,
+ | CAST(COUNT(1) AS VARCHAR) AS ct
+ |FROM T
+ |GROUP BY
+ | x, y
+ |""".stripMargin)
+
+ // | x | concat_ws |
+ // |---|------------|
+ // | 1 | a=2 |
+ // | 1 | b=1 |
+ // | 1 | 1 |
+ val sqlQuery =
+ s"""
+ |select
+ | x,
+ | '[' || LISTAGG(CONCAT_WS('=', y, ct), ';') || ']' AS list1,
+ | '[' || LISTAGG(CONCAT_WS('=', y, ct)) || ']' AS list2
+ |FROM view1
+ |GROUP BY x
+ """.stripMargin
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List("1,[b=1;1;a=2],[b=1,1,a=2]")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
@Test
def testListAggWithNullData(): Unit = {
val dataWithNull = List(