You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 03:38:19 UTC
[flink] 01/03: [FLINK-13529][table-planner-blink] Rename CONCAT_AGG
to LISTAGG and fix the behavior according to the ANSI-SQL
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ca414aeb4e7495bbfa2751d241aafc85bc3fa9b6
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Aug 1 16:55:23 2019 +0800
[FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the behavior according to the ANSI-SQL
According to the ANSI-SQL, the LISTAGG function is used to transform values from a group of rows into a list of values that are delimited by a configurable separator.
This closes #9316
---
...ConcatAggFunction.java => ListAggFunction.java} | 14 +++---
...ion.java => ListAggWithRetractAggFunction.java} | 30 ++++++------
...n.java => ListAggWsWithRetractAggFunction.java} | 30 ++++++------
.../functions/sql/FlinkSqlOperatorTable.java | 4 +-
...catAggFunction.java => SqlListAggFunction.java} | 27 +++++++----
.../plan/rules/logical/SplitAggregateRule.scala | 4 +-
.../planner/plan/utils/AggFunctionFactory.scala | 22 ++++-----
.../table/planner/plan/utils/AggregateUtil.scala | 4 +-
...java => ListAggWithRetractAggFunctionTest.java} | 16 +++----
...va => ListAggWsWithRetractAggFunctionTest.java} | 36 +++++++-------
.../apache/flink/table/api/stream/ExplainTest.xml | 48 +++++++++----------
.../planner/plan/batch/sql/RemoveCollationTest.xml | 16 +++----
.../plan/rules/logical/SplitAggregateRuleTest.xml | 8 ++--
.../plan/stream/sql/MiniBatchIntervalInferTest.xml | 40 ++++++++--------
.../plan/stream/sql/agg/DistinctAggregateTest.xml | 34 ++++++-------
.../stream/sql/agg/IncrementalAggregateTest.xml | 10 ++--
.../flink/table/api/stream/ExplainTest.scala | 4 +-
.../plan/batch/sql/RemoveCollationTest.scala | 4 +-
.../rules/logical/SplitAggregateRuleTest.scala | 2 +-
.../stream/sql/MiniBatchIntervalInferTest.scala | 6 +--
.../stream/sql/agg/DistinctAggregateTest.scala | 2 +-
.../table/validation/AggregateValidationTest.scala | 55 ++++++++++++++++++----
.../runtime/batch/sql/agg/SortAggITCase.scala | 11 ++---
.../runtime/stream/sql/AggregateITCase.scala | 20 ++++----
.../runtime/stream/table/AggregateITCase.scala | 2 +-
25 files changed, 245 insertions(+), 204 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
similarity index 91%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
index 2e8f5fb..377b251 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
@@ -32,23 +32,23 @@ import static org.apache.flink.table.planner.expressions.ExpressionBuilder.liter
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
/**
- * built-in concat aggregate function.
+ * built-in listagg aggregate function.
*/
-public class ConcatAggFunction extends DeclarativeAggregateFunction {
+public class ListAggFunction extends DeclarativeAggregateFunction {
private int operandCount;
private UnresolvedReferenceExpression acc = unresolvedRef("concatAcc");
private UnresolvedReferenceExpression accDelimiter = unresolvedRef("accDelimiter");
private Expression delimiter;
private Expression operand;
- public ConcatAggFunction(int operandCount) {
+ public ListAggFunction(int operandCount) {
this.operandCount = operandCount;
if (operandCount == 1) {
- delimiter = literal("\n", DataTypes.STRING());
+ delimiter = literal(",", DataTypes.STRING());
operand = operand(0);
} else {
- delimiter = operand(0);
- operand = operand(1);
+ delimiter = operand(1);
+ operand = operand(0);
}
}
@@ -75,7 +75,7 @@ public class ConcatAggFunction extends DeclarativeAggregateFunction {
@Override
public Expression[] initialValuesExpressions() {
return new Expression[] {
- /* delimiter */ literal("\n", DataTypes.STRING()),
+ /* delimiter */ literal(",", DataTypes.STRING()),
/* acc */ nullOf(DataTypes.STRING())
};
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
similarity index 77%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
index ea5857a..8840844 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
@@ -31,18 +31,18 @@ import java.util.List;
import java.util.Objects;
/**
- * built-in concat with retraction aggregate function.
+ * built-in listagg with retraction aggregate function.
*/
-public final class ConcatWithRetractAggFunction
- extends AggregateFunction<BinaryString, ConcatWithRetractAggFunction.ConcatWithRetractAccumulator> {
+public final class ListAggWithRetractAggFunction
+ extends AggregateFunction<BinaryString, ListAggWithRetractAggFunction.ListAggWithRetractAccumulator> {
private static final long serialVersionUID = -2836795091288790955L;
- private static final BinaryString lineDelimiter = BinaryString.fromString("\n");
+ private static final BinaryString lineDelimiter = BinaryString.fromString(",");
/**
- * The initial accumulator for concat with retraction aggregate function.
+ * The initial accumulator for listagg with retraction aggregate function.
*/
- public static class ConcatWithRetractAccumulator {
+ public static class ListAggWithRetractAccumulator {
public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE);
public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE);
@@ -55,25 +55,25 @@ public final class ConcatWithRetractAggFunction
if (o == null || getClass() != o.getClass()) {
return false;
}
- ConcatWithRetractAccumulator that = (ConcatWithRetractAccumulator) o;
+ ListAggWithRetractAccumulator that = (ListAggWithRetractAccumulator) o;
return Objects.equals(list, that.list) &&
Objects.equals(retractList, that.retractList);
}
}
@Override
- public ConcatWithRetractAccumulator createAccumulator() {
- return new ConcatWithRetractAccumulator();
+ public ListAggWithRetractAccumulator createAccumulator() {
+ return new ListAggWithRetractAccumulator();
}
- public void accumulate(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception {
+ public void accumulate(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception {
// ignore null value
if (value != null) {
acc.list.add(value);
}
}
- public void retract(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception {
+ public void retract(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception {
if (value != null) {
if (!acc.list.remove(value)) {
acc.retractList.add(value);
@@ -81,8 +81,8 @@ public final class ConcatWithRetractAggFunction
}
}
- public void merge(ConcatWithRetractAccumulator acc, Iterable<ConcatWithRetractAccumulator> its) throws Exception {
- for (ConcatWithRetractAccumulator otherAcc : its) {
+ public void merge(ListAggWithRetractAccumulator acc, Iterable<ListAggWithRetractAccumulator> its) throws Exception {
+ for (ListAggWithRetractAccumulator otherAcc : its) {
// merge list of acc and other
List<BinaryString> buffer = new ArrayList<>();
for (BinaryString binaryString : acc.list.get()) {
@@ -117,7 +117,7 @@ public final class ConcatWithRetractAggFunction
}
@Override
- public BinaryString getValue(ConcatWithRetractAccumulator acc) {
+ public BinaryString getValue(ListAggWithRetractAccumulator acc) {
try {
Iterable<BinaryString> accList = acc.list.get();
if (accList == null || !accList.iterator().hasNext()) {
@@ -131,7 +131,7 @@ public final class ConcatWithRetractAggFunction
}
}
- public void resetAccumulator(ConcatWithRetractAccumulator acc) {
+ public void resetAccumulator(ListAggWithRetractAccumulator acc) {
acc.list.clear();
acc.retractList.clear();
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
similarity index 77%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
index a968cb4..317f97e 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
@@ -31,20 +31,20 @@ import java.util.List;
import java.util.Objects;
/**
- * built-in concatWs with retraction aggregate function.
+ * built-in listAggWs with retraction aggregate function.
*/
-public final class ConcatWsWithRetractAggFunction
- extends AggregateFunction<BinaryString, ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator> {
+public final class ListAggWsWithRetractAggFunction
+ extends AggregateFunction<BinaryString, ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator> {
private static final long serialVersionUID = -8627988150350160473L;
/**
* The initial accumulator for concat with retraction aggregate function.
*/
- public static class ConcatWsWithRetractAccumulator {
+ public static class ListAggWsWithRetractAccumulator {
public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE);
public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE);
- public BinaryString delimiter = BinaryString.fromString("\n");
+ public BinaryString delimiter = BinaryString.fromString(",");
@VisibleForTesting
@Override
@@ -55,7 +55,7 @@ public final class ConcatWsWithRetractAggFunction
if (o == null || getClass() != o.getClass()) {
return false;
}
- ConcatWsWithRetractAccumulator that = (ConcatWsWithRetractAccumulator) o;
+ ListAggWsWithRetractAccumulator that = (ListAggWsWithRetractAccumulator) o;
return Objects.equals(list, that.list) &&
Objects.equals(retractList, that.retractList) &&
Objects.equals(delimiter, that.delimiter);
@@ -63,11 +63,11 @@ public final class ConcatWsWithRetractAggFunction
}
@Override
- public ConcatWsWithRetractAccumulator createAccumulator() {
- return new ConcatWsWithRetractAccumulator();
+ public ListAggWsWithRetractAccumulator createAccumulator() {
+ return new ListAggWsWithRetractAccumulator();
}
- public void accumulate(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception {
+ public void accumulate(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception {
// ignore null value
if (value != null) {
acc.delimiter = lineDelimiter;
@@ -75,7 +75,7 @@ public final class ConcatWsWithRetractAggFunction
}
}
- public void retract(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception {
+ public void retract(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception {
if (value != null) {
acc.delimiter = lineDelimiter;
if (!acc.list.remove(value)) {
@@ -84,8 +84,8 @@ public final class ConcatWsWithRetractAggFunction
}
}
- public void merge(ConcatWsWithRetractAccumulator acc, Iterable<ConcatWsWithRetractAccumulator> its) throws Exception {
- for (ConcatWsWithRetractAccumulator otherAcc : its) {
+ public void merge(ListAggWsWithRetractAccumulator acc, Iterable<ListAggWsWithRetractAccumulator> its) throws Exception {
+ for (ListAggWsWithRetractAccumulator otherAcc : its) {
if (!otherAcc.list.get().iterator().hasNext()
&& !otherAcc.retractList.get().iterator().hasNext()) {
// otherAcc is empty, skip it
@@ -127,7 +127,7 @@ public final class ConcatWsWithRetractAggFunction
}
@Override
- public BinaryString getValue(ConcatWsWithRetractAccumulator acc) {
+ public BinaryString getValue(ListAggWsWithRetractAccumulator acc) {
try {
Iterable<BinaryString> accList = acc.list.get();
if (accList == null || !accList.iterator().hasNext()) {
@@ -141,8 +141,8 @@ public final class ConcatWsWithRetractAggFunction
}
}
- public void resetAccumulator(ConcatWsWithRetractAccumulator acc) {
- acc.delimiter = BinaryString.fromString("\n");
+ public void resetAccumulator(ListAggWsWithRetractAccumulator acc) {
+ acc.delimiter = BinaryString.fromString(",");
acc.list.clear();
acc.retractList.clear();
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 50ff193..dbabb69 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -946,9 +946,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFirstLastValueAggFunction LAST_VALUE = new SqlFirstLastValueAggFunction(SqlKind.LAST_VALUE);
/**
- * <code>CONCAT_AGG</code> aggregate function.
+ * <code>LISTAGG</code> aggregate function.
*/
- public static final SqlConcatAggFunction CONCAT_AGG = new SqlConcatAggFunction();
+ public static final SqlListAggFunction LISTAGG = new SqlListAggFunction();
/**
* <code>INCR_SUM</code> aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
similarity index 71%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
index 7e215e0..9557f0c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
@@ -25,30 +25,37 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeTransforms;
import java.util.List;
/**
- * <code>CONCAT_AGG</code> aggregate function returns the concatenation of
+ * <code>LISTAGG</code> aggregate function returns the concatenation of
* a list of values that are input to the function.
+ *
+ * <p>NOTE: The difference between this and {@link SqlStdOperatorTable#LISTAGG} is that:
+ * (1). constraint the second parameter must to be a character literal.
+ * (2). not require over clause to use this aggregate function.
*/
-public class SqlConcatAggFunction extends SqlAggFunction {
+public class SqlListAggFunction extends SqlAggFunction {
- public SqlConcatAggFunction() {
- super("CONCAT_AGG",
+ public SqlListAggFunction() {
+ super("LISTAGG",
null,
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+ SqlKind.LISTAGG,
+ ReturnTypes.ARG0_NULLABLE,
null,
OperandTypes.or(
OperandTypes.CHARACTER,
- OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
- SqlFunctionCategory.STRING,
+ OperandTypes.sequence(
+ "'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)'",
+ OperandTypes.CHARACTER,
+ OperandTypes.and(OperandTypes.CHARACTER, OperandTypes.LITERAL)
+ )),
+ SqlFunctionCategory.SYSTEM,
false,
false);
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index b2bd3a5..524d5a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -334,8 +334,8 @@ object SplitAggregateRule {
(Seq(FlinkSqlOperatorTable.FIRST_VALUE), Seq(FlinkSqlOperatorTable.FIRST_VALUE)),
FlinkSqlOperatorTable.LAST_VALUE ->
(Seq(FlinkSqlOperatorTable.LAST_VALUE), Seq(FlinkSqlOperatorTable.LAST_VALUE)),
- FlinkSqlOperatorTable.CONCAT_AGG ->
- (Seq(FlinkSqlOperatorTable.CONCAT_AGG), Seq(FlinkSqlOperatorTable.CONCAT_AGG)),
+ FlinkSqlOperatorTable.LISTAGG ->
+ (Seq(FlinkSqlOperatorTable.LISTAGG), Seq(FlinkSqlOperatorTable.LISTAGG)),
SINGLE_VALUE -> (Seq(SINGLE_VALUE), Seq(SINGLE_VALUE))
)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 4dd3439..a8e00d3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu
import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._
import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._
import org.apache.flink.table.planner.functions.aggfunctions._
-import org.apache.flink.table.planner.functions.sql.{SqlConcatAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
+import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
import org.apache.flink.table.planner.functions.utils.AggSqlFunction
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
@@ -113,11 +113,11 @@ class AggFunctionFactory(
case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.LAST_VALUE =>
createLastValueAggFunction(argTypes, index)
- case _: SqlConcatAggFunction if call.getArgList.size() == 1 =>
- createConcatAggFunction(argTypes, index)
+ case _: SqlListAggFunction if call.getArgList.size() == 1 =>
+ createListAggFunction(argTypes, index)
- case _: SqlConcatAggFunction if call.getArgList.size() == 2 =>
- createConcatWsAggFunction(argTypes, index)
+ case _: SqlListAggFunction if call.getArgList.size() == 2 =>
+ createListAggWsFunction(argTypes, index)
// TODO supports SqlCardinalityCountAggFunction
@@ -606,23 +606,23 @@ class AggFunctionFactory(
}
}
- private def createConcatAggFunction(
+ private def createListAggFunction(
argTypes: Array[LogicalType],
index: Int): UserDefinedFunction = {
if (needRetraction(index)) {
- new ConcatWithRetractAggFunction
+ new ListAggWithRetractAggFunction
} else {
- new ConcatAggFunction(1)
+ new ListAggFunction(1)
}
}
- private def createConcatWsAggFunction(
+ private def createListAggWsFunction(
argTypes: Array[LogicalType],
index: Int): UserDefinedFunction = {
if (needRetraction(index)) {
- new ConcatWsWithRetractAggFunction
+ new ListAggWsWithRetractAggFunction
} else {
- new ConcatAggFunction(2)
+ new ListAggFunction(2)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 544555a..8877be5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.planner.dataview.DataViewUtils.useNullSerializerFo
import org.apache.flink.table.planner.dataview.{DataViewSpec, MapViewSpec}
import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart, RexNodeConverter}
import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
-import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlConcatAggFunction, SqlFirstLastValueAggFunction}
+import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlListAggFunction, SqlFirstLastValueAggFunction}
import org.apache.flink.table.planner.functions.utils.AggSqlFunction
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
@@ -541,7 +541,7 @@ object AggregateUtil extends Enumeration {
_: SqlSumAggFunction |
_: SqlSumEmptyIsZeroAggFunction |
_: SqlSingleValueAggFunction |
- _: SqlConcatAggFunction => true
+ _: SqlListAggFunction => true
case _: SqlFirstLastValueAggFunction => aggCall.getArgList.size() == 1
case _ => false
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
similarity index 79%
rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
index 725d802..5c5566e 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
@@ -20,17 +20,17 @@ package org.apache.flink.table.planner.functions.aggfunctions;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.planner.functions.aggfunctions.ConcatWithRetractAggFunction.ConcatWithRetractAccumulator;
+import org.apache.flink.table.planner.functions.aggfunctions.ListAggWithRetractAggFunction.ListAggWithRetractAccumulator;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
/**
- * Test case for built-in concat with retraction aggregate function.
+ * Test case for built-in LISTAGG with retraction aggregate function.
*/
-public class ConcatWithRetractAggFunctionTest
- extends AggFunctionTestBase<BinaryString, ConcatWithRetractAccumulator> {
+public class ListAggWithRetractAggFunctionTest
+ extends AggFunctionTestBase<BinaryString, ListAggWithRetractAccumulator> {
@Override
protected List<List<BinaryString>> getInputValueSets() {
@@ -53,14 +53,14 @@ public class ConcatWithRetractAggFunctionTest
@Override
protected List<BinaryString> getExpectedResults() {
return Arrays.asList(
- BinaryString.fromString("a\nb\nc\nd\ne\nf"),
+ BinaryString.fromString("a,b,c,d,e,f"),
null,
BinaryString.fromString("a"));
}
@Override
- protected AggregateFunction<BinaryString, ConcatWithRetractAccumulator> getAggregator() {
- return new ConcatWithRetractAggFunction();
+ protected AggregateFunction<BinaryString, ListAggWithRetractAccumulator> getAggregator() {
+ return new ListAggWithRetractAggFunction();
}
@Override
@@ -75,6 +75,6 @@ public class ConcatWithRetractAggFunctionTest
@Override
protected Class<?> getAccClass() {
- return ConcatWithRetractAccumulator.class;
+ return ListAggWithRetractAccumulator.class;
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
similarity index 78%
rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
index d9e06d0..67ee8ff 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.functions.aggfunctions;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.planner.functions.aggfunctions.ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator;
+import org.apache.flink.table.planner.functions.aggfunctions.ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator;
import org.apache.flink.util.Preconditions;
import java.lang.reflect.InvocationTargetException;
@@ -34,8 +34,8 @@ import static org.junit.Assert.assertEquals;
/**
* Test case for built-in concatWs with retraction aggregate function.
*/
-public class ConcatWsWithRetractAggFunctionTest
- extends AggFunctionTestBase<BinaryString, ConcatWsWithRetractAccumulator> {
+public class ListAggWsWithRetractAggFunctionTest
+ extends AggFunctionTestBase<BinaryString, ListAggWsWithRetractAccumulator> {
@Override
protected List<List<BinaryString>> getInputValueSets() {
@@ -84,8 +84,8 @@ public class ConcatWsWithRetractAggFunctionTest
}
@Override
- protected AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> getAggregator() {
- return new ConcatWsWithRetractAggFunction();
+ protected AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> getAggregator() {
+ return new ListAggWsWithRetractAggFunction();
}
@Override
@@ -101,14 +101,14 @@ public class ConcatWsWithRetractAggFunctionTest
@Override
protected Class<?> getAccClass() {
- return ConcatWsWithRetractAccumulator.class;
+ return ListAggWsWithRetractAccumulator.class;
}
@Override
protected <E> void validateResult(E expected, E result) {
- if (expected instanceof ConcatWsWithRetractAccumulator && result instanceof ConcatWsWithRetractAccumulator) {
- ConcatWsWithRetractAccumulator e = (ConcatWsWithRetractAccumulator) expected;
- ConcatWsWithRetractAccumulator r = (ConcatWsWithRetractAccumulator) result;
+ if (expected instanceof ListAggWsWithRetractAccumulator && result instanceof ListAggWsWithRetractAccumulator) {
+ ListAggWsWithRetractAccumulator e = (ListAggWsWithRetractAccumulator) expected;
+ ListAggWsWithRetractAccumulator r = (ListAggWsWithRetractAccumulator) result;
assertEquals(e.list, r.list);
assertEquals(e.list, r.list);
} else {
@@ -117,31 +117,31 @@ public class ConcatWsWithRetractAggFunctionTest
}
@Override
- protected ConcatWsWithRetractAccumulator accumulateValues(List<BinaryString> values)
+ protected ListAggWsWithRetractAccumulator accumulateValues(List<BinaryString> values)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
- AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator();
- ConcatWsWithRetractAccumulator accumulator = getAggregator().createAccumulator();
+ AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator();
+ ListAggWsWithRetractAccumulator accumulator = getAggregator().createAccumulator();
Method accumulateFunc = getAccumulateFunc();
Preconditions.checkArgument(values.size() % 2 == 0,
"number of values must be an integer multiple of 2.");
for (int i = 0; i < values.size(); i += 2) {
- BinaryString value = values.get(i);
- BinaryString delimiter = values.get(i + 1);
+ BinaryString value = values.get(i + 1);
+ BinaryString delimiter = values.get(i);
accumulateFunc.invoke(aggregator, accumulator, delimiter, value);
}
return accumulator;
}
@Override
- protected void retractValues(ConcatWsWithRetractAccumulator accumulator, List<BinaryString> values)
+ protected void retractValues(ListAggWsWithRetractAccumulator accumulator, List<BinaryString> values)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
- AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator();
+ AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator();
Method retractFunc = getRetractFunc();
Preconditions.checkArgument(values.size() % 2 == 0,
"number of values must be an integer multiple of 2.");
for (int i = 0; i < values.size(); i += 2) {
- BinaryString value = values.get(i);
- BinaryString delimiter = values.get(i + 1);
+ BinaryString value = values.get(i + 1);
+ BinaryString delimiter = values.get(i);
retractFunc.invoke(aggregator, accumulator, delimiter, value);
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index fd84d11..16d3278 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -637,8 +637,8 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(name=[appendSink1], fields=[a, b])
+- LogicalProject(id1=[$0], EXPR$1=[$2])
- +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
- +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#'])
+- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
@@ -649,8 +649,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
LogicalSink(name=[appendSink2], fields=[a, b])
+- LogicalProject(id1=[$0], EXPR$1=[$2])
- +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
- +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
+- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
@@ -670,15 +670,15 @@ Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc]
+- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
- +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Reused(reference_id=[1])
Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
- +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+ +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+- Reused(reference_id=[1])
== Physical Execution Plan ==
@@ -713,11 +713,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+ content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
ship_strategy : HASH
: Operator
@@ -725,11 +725,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+ content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
ship_strategy : FORWARD
: Operator
- content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
ship_strategy : HASH
: Operator
@@ -784,8 +784,8 @@ Union(all=[true], union=[a, b, c])
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(name=[appendSink1], fields=[a, b])
+- LogicalProject(id1=[$0], EXPR$1=[$2])
- +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
- +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#'])
+- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
@@ -796,8 +796,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
LogicalSink(name=[appendSink2], fields=[a, b])
+- LogicalProject(id1=[$0], EXPR$1=[$2])
- +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
- +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+ +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
+- LogicalProject(id1=[$0], ts=[$2], text=[$1])
+- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
@@ -817,15 +817,15 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime])
Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
- +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text])
+ +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
+- Reused(reference_id=[1])
Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
- +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text])
+ +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
+- Reused(reference_id=[1])
== Physical Execution Plan ==
@@ -860,11 +860,11 @@ Sink(name=[appendSink2], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+ content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
ship_strategy : HASH
: Operator
@@ -872,11 +872,11 @@ Sink(name=[appendSink2], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+ content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
ship_strategy : FORWARD
: Operator
- content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
ship_strategy : HASH
: Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
index fd8003b..76b6419 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
@@ -310,7 +310,7 @@ v2 as (
),
join_tb2 as (
- select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+ select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
from (
select v1.id as tb1_id, tb2.name as tb2_name
from v1 left outer join tb2 on tb2_id = tb2.id
@@ -318,7 +318,7 @@ join_tb2 as (
),
join_tb3 as (
- select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+ select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
from (
select v2.id as tb1_id, tb3.name as tb3_name
from v2 left outer join tb3 on tb3_id = tb3.id
@@ -349,7 +349,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t
: :- LogicalJoin(condition=[=($0, $7)], joinType=[left])
: : :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
: : : :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]])
- : : : +- LogicalAggregate(group=[{0}], tb2_names=[CONCAT_AGG($1, $2)])
+ : : : +- LogicalAggregate(group=[{0}], tb2_names=[LISTAGG($1, $2)])
: : : +- LogicalProject(tb1_id=[$0], tb2_name=[$3], $f2=[_UTF-16LE','])
: : : +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
: : : :- LogicalProject(id=[$0], tb2_id=[$5])
@@ -357,7 +357,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t
: : : : :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]])
: : : : +- LogicalTableFunctionScan(invocation=[split($cor0.tb2_ids)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;])
: : : +- LogicalTableScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]])
- : : +- LogicalAggregate(group=[{0}], tb3_names=[CONCAT_AGG($1, $2)])
+ : : +- LogicalAggregate(group=[{0}], tb3_names=[LISTAGG($1, $2)])
: : +- LogicalProject(tb1_id=[$0], tb3_name=[$3], $f2=[_UTF-16LE','])
: : +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
: : :- LogicalProject(id=[$0], tb3_id=[$5])
@@ -382,10 +382,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1])
: : : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(id, tb1_id)], select=[id, key, tb2_ids, tb3_ids, name, tb1_id, tb2_names], rightSorted=[true])
: : : :- Exchange(distribution=[hash[id]])
: : : : +- TableSourceScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]], fields=[id, key, tb2_ids, tb3_ids, name], reuse_id=[1])
- : : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb2_names])
+ : : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb2_names])
: : : +- Sort(orderBy=[tb1_id ASC])
: : : +- Exchange(distribution=[hash[tb1_id]])
- : : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)])
+ : : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)])
: : : +- Sort(orderBy=[tb1_id ASC])
: : : +- Calc(select=[id AS tb1_id, name AS tb2_name, _UTF-16LE',' AS $f2])
: : : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb2_id, id0)], select=[id, tb2_id, id0, name])
@@ -395,10 +395,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1])
: : : : +- Reused(reference_id=[1])
: : : +- Exchange(distribution=[hash[id]])
: : : +- TableSourceScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]], fields=[id, name])
- : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb3_names])
+ : : +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb3_names])
: : +- Sort(orderBy=[tb1_id ASC])
: : +- Exchange(distribution=[hash[tb1_id]])
- : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)])
+ : : +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)])
: : +- Sort(orderBy=[tb1_id ASC])
: : +- Calc(select=[id AS tb1_id, name AS tb3_name, _UTF-16LE',' AS $f2])
: : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb3_id, id0)], select=[id, tb3_id, id0, name])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
index a57c307..b59a9c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -171,19 +171,19 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[SUM(
</TestCase>
<TestCase name="testSingleConcatAggWithDistinctAgg">
<Resource name="sql">
- <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+ <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+- LogicalProject(a=[$0], c=[$2], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalAggregate(group=[{0}], agg#0=[CONCAT_AGG($2)], agg#1=[$SUM0($3)])
-+- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[CONCAT_AGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
+FlinkLogicalAggregate(group=[{0}], agg#0=[LISTAGG($2)], agg#1=[$SUM0($3)])
++- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
+- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
+- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
+- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 19d3208..9a1d03b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -220,11 +220,11 @@ Calc(select=[a, b])
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(name=[appendSink1], fields=[a, b])
+- LogicalProject(id1=[$1], EXPR$1=[$2])
- +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
- +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'*'], text=[$1])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+ +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
+- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
- +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
- +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
@@ -235,8 +235,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
LogicalSink(name=[appendSink2], fields=[a, b])
+- LogicalProject(id1=[$1], EXPR$1=[$2])
- +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
- +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'-'], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-'])
+- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
@@ -249,8 +249,8 @@ LogicalSink(name=[appendSink3], fields=[a, b])
+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+- LogicalProject(id1=[$0], text=[$1])
+- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
- +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
- +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
@@ -269,21 +269,21 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
+- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+- Exchange(distribution=[hash[id1]])
- +- Calc(select=[ts, id1, _UTF-16LE'#' AS $f2, text])
+ +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
+- Reused(reference_id=[1])
Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
- +- Calc(select=[w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text])
+ +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
+- Reused(reference_id=[2])
Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
- +- Calc(select=[ts, id1, _UTF-16LE'-' AS $f2, text])
+ +- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
+- Reused(reference_id=[1])
Sink(name=[appendSink3], fields=[a, b])
@@ -325,19 +325,19 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (ts, id1, _UTF-16LE'#' AS $f2, text))
+ content : Calc(select: (ts, id1, text, _UTF-16LE'#' AS $f3))
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
ship_strategy : HASH
: Operator
- content : Calc(select: (w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text))
+ content : Calc(select: (w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3))
ship_strategy : FORWARD
: Operator
- content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
ship_strategy : HASH
: Operator
@@ -345,11 +345,11 @@ Sink(name=[appendSink3], fields=[a, b])
ship_strategy : FORWARD
: Operator
- content : Calc(select: (ts, id1, _UTF-16LE'-' AS $f2, text))
+ content : Calc(select: (ts, id1, text, _UTF-16LE'-' AS $f3))
ship_strategy : FORWARD
: Operator
- content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+ content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
ship_strategy : HASH
: Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index e9ec761..dc74eb4 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -677,18 +677,18 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
</TestCase>
<TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
<Resource name="sql">
- <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+ <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+- LogicalProject(a=[$0], c=[$2], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -697,20 +697,20 @@ GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b
</TestCase>
<TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
- <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+ <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+- LogicalProject(a=[$0], c=[$2], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
+- Exchange(distribution=[hash[a]])
- +- LocalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+ +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
@@ -718,20 +718,20 @@ GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatA
</TestCase>
<TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
<Resource name="sql">
- <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+ <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+- LogicalProject(a=[$0], c=[$2], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
+GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
+- Exchange(distribution=[hash[a]])
- +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
+ +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
+- Exchange(distribution=[hash[a, $f3]])
+- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
@@ -743,23 +743,23 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETR
</TestCase>
<TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
- <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+ <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+- LogicalProject(a=[$0], c=[$2], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT(concat_agg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT(listagg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
+- Exchange(distribution=[hash[a]])
- +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS concat_agg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
- +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
+ +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS listagg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
+ +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
+- Exchange(distribution=[hash[a, $f3]])
- +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+ +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index a415121..ccef832 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -196,22 +196,22 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
</TestCase>
<TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
- <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+ <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+- LogicalProject(a=[$0], c=[$2], b=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
+- Exchange(distribution=[hash[a]])
- +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
+ +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
+- Exchange(distribution=[hash[a, $f3]])
- +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+ +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
+- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
+- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 9952e34..198fb0f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -131,7 +131,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
val table1 = util.tableEnv.sqlQuery(
"""
- |SELECT id1, CONCAT_AGG('#', text)
+ |SELECT id1, LISTAGG(text, '#')
|FROM TempTable
|GROUP BY id1, TUMBLE(ts, INTERVAL '8' SECOND)
""".stripMargin)
@@ -140,7 +140,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
val table2 = util.tableEnv.sqlQuery(
"""
- |SELECT id1, CONCAT_AGG('*', text)
+ |SELECT id1, LISTAGG(text, '*')
|FROM TempTable
|GROUP BY id1, HOP(ts, INTERVAL '12' SECOND, INTERVAL '6' SECOND)
""".stripMargin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
index 73c202d..5a5bdf5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
@@ -307,7 +307,7 @@ class RemoveCollationTest extends TableTestBase {
|),
|
|join_tb2 as (
- | select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+ | select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
| from (
| select v1.id as tb1_id, tb2.name as tb2_name
| from v1 left outer join tb2 on tb2_id = tb2.id
@@ -315,7 +315,7 @@ class RemoveCollationTest extends TableTestBase {
|),
|
|join_tb3 as (
- | select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+ | select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
| from (
| select v2.id as tb1_id, tb3.name as tb3_name
| from v2 left outer join tb3 on tb3_id = tb3.id
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index b91649b..5d58dfb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -95,7 +95,7 @@ class SplitAggregateRuleTest extends TableTestBase {
@Test
def testSingleConcatAggWithDistinctAgg(): Unit = {
- util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
+ util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index ce9e042..9de3886 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -297,7 +297,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
val table2 = util.tableEnv.sqlQuery(
"""
|SELECT id1,
- | CONCAT_AGG('#', text) as text,
+ | LISTAGG(text, '#') as text,
| TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts
|FROM TempTable1
|GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
@@ -307,7 +307,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
val table3 = util.tableEnv.sqlQuery(
"""
|SELECT id1,
- | CONCAT_AGG('*', text)
+ | LISTAGG(text, '*')
|FROM TempTable2
|GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1
""".stripMargin)
@@ -317,7 +317,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
val table4 = util.tableEnv.sqlQuery(
"""
|SELECT id1,
- | CONCAT_AGG('-', text)
+ | LISTAGG(text, '-')
|FROM TempTable1
|GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1
""".stripMargin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 7a30103..8e36305 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -103,7 +103,7 @@ class DistinctAggregateTest(
@Test
def testSingleConcatAggWithDistinctAgg(): Unit = {
- util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
+ util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
index 5ce79fc..99ed303 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
@@ -22,14 +22,16 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.utils.{TableFunc0, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.{assertTrue, fail}
import org.junit.Test
class AggregateValidationTest extends TableTestBase {
+ private val util = scalaStreamTestUtil()
@Test(expected = classOf[ValidationException])
def testGroupingOnNonExistentField(): Unit = {
- val util = streamTestUtil()
val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
val ds = table
@@ -40,7 +42,6 @@ class AggregateValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testGroupingInvalidSelection(): Unit = {
- val util = streamTestUtil()
val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
table
@@ -51,7 +52,6 @@ class AggregateValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testInvalidAggregationInSelection(): Unit = {
- val util = streamTestUtil()
val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
table
@@ -63,7 +63,6 @@ class AggregateValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testInvalidWindowPropertiesInSelection(): Unit = {
- val util = streamTestUtil()
val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
table
@@ -75,7 +74,6 @@ class AggregateValidationTest extends TableTestBase {
@Test(expected = classOf[RuntimeException])
def testTableFunctionInSelection(): Unit = {
- val util = streamTestUtil()
val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
util.addFunction("func", new TableFunc0)
@@ -90,7 +88,6 @@ class AggregateValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testInvalidScalarFunctionInAggregate(): Unit = {
- val util = streamTestUtil()
val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
table
@@ -102,7 +99,6 @@ class AggregateValidationTest extends TableTestBase {
@Test(expected = classOf[ValidationException])
def testInvalidTableFunctionInAggregate(): Unit = {
- val util = streamTestUtil()
val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
util.addFunction("func", new TableFunc0)
@@ -115,13 +111,52 @@ class AggregateValidationTest extends TableTestBase {
@Test(expected = classOf[RuntimeException])
def testMultipleAggregateExpressionInAggregate(): Unit = {
- val util = streamTestUtil()
- val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
-
util.addFunction("func", new TableFunc0)
+ val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
table
.groupBy('a)
// must fail. Only one AggregateFunction can be used in aggregate
.aggregate("sum(c), count(b)")
}
+
+ @Test
+ def testIllegalArgumentForListAgg(): Unit = {
+ util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+ // If there are two parameters, second one must be character literal.
+ expectExceptionThrown(
+ "SELECT listagg(c, d) FROM T GROUP BY a",
+ "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)",
+ classOf[ValidationException])
+ }
+
+ @Test
+ def testIllegalArgumentForListAgg1(): Unit = {
+ util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+ // If there are two parameters, second one must be character literal.
+ expectExceptionThrown(
+ "SELECT LISTAGG(c, 1) FROM T GROUP BY a",
+ "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)",
+ classOf[ValidationException])
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def expectExceptionThrown(
+ sql: String,
+ keywords: String,
+ clazz: Class[_ <: Throwable] = classOf[ValidationException])
+ : Unit = {
+ try {
+ util.tableEnv.toAppendStream[Row](util.tableEnv.sqlQuery(sql))
+ fail(s"Expected a $clazz, but no exception is thrown.")
+ } catch {
+ case e if e.getClass == clazz =>
+ if (keywords != null) {
+ assertTrue(
+ s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
+ e.getMessage.contains(keywords))
+ }
+ case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
+ }
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index f64b0b6..eed0d3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -138,27 +138,26 @@ class SortAggITCase
}
// NOTE: Spark has agg functions collect_list(), collect_set().
- // instead, we'll test concat_agg() here
- @Ignore
+ // instead, we'll test LISTAGG() here
@Test
def testConcatAgg(): Unit = {
checkResult(
- "SELECT concat_agg('-', c), concat_agg(c) FROM SmallTable3",
+ "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM SmallTable3",
Seq(
- row("Hi-Hello-Hello world", "Hi\nHello\nHello world")
+ row("Hi-Hello-Hello world", "Hi,Hello,Hello world")
)
)
// EmptyTable5
checkResult(
- "SELECT concat_agg('-', g), concat_agg(g) FROM EmptyTable5",
+ "SELECT LISTAGG(g, '-'), LISTAGG(g) FROM EmptyTable5",
Seq(
row(null, null)
)
)
checkResult(
- "SELECT concat_agg('-', c), concat_agg(c) FROM AllNullTable3",
+ "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM AllNullTable3",
Seq(
row(null, null)
)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 318487fe..41561a3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.functions.aggfunctions.{ConcatWithRetractAggFunction, ConcatWsWithRetractAggFunction}
+import org.apache.flink.table.planner.functions.aggfunctions.{ListAggWithRetractAggFunction, ListAggWsWithRetractAggFunction}
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSumAggFunction
import org.apache.flink.table.planner.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsAggFunction}
import org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.AggMode
@@ -605,7 +605,7 @@ class AggregateITCase(
val sqlQuery =
s"""
- |SELECT len, concat_agg('#', content) FROM T GROUP BY len
+ |SELECT len, listagg(content, '#') FROM T GROUP BY len
""".stripMargin
val sink = new TestingRetractSink
@@ -629,7 +629,7 @@ class AggregateITCase(
val sqlQuery =
s"""
- |SELECT len, concat_agg(content) FROM T GROUP BY len
+ |SELECT len, listagg(content) FROM T GROUP BY len
""".stripMargin
val sink = new TestingRetractSink
@@ -906,7 +906,7 @@ class AggregateITCase(
"""
|SELECT b, min(c), max(c)
|FROM (
- | SELECT a, b, concat_agg(c) as c
+ | SELECT a, b, listagg(c) as c
| FROM T
| GROUP BY a, b)
|GROUP BY b
@@ -1061,15 +1061,15 @@ class AggregateITCase(
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
- /** Test CONCAT_AGG **/
+ /** Test LISTAGG **/
@Test
def testConcatAgg(): Unit = {
- tEnv.registerFunction("concat_agg_retract", new ConcatWithRetractAggFunction)
- tEnv.registerFunction("concat_agg_ws_retract", new ConcatWsWithRetractAggFunction)
+ tEnv.registerFunction("listagg_retract", new ListAggWithRetractAggFunction)
+ tEnv.registerFunction("listagg_ws_retract", new ListAggWsWithRetractAggFunction)
val sqlQuery =
s"""
|SELECT
- | concat_agg(c), concat_agg('-', c), concat_agg_retract(c), concat_agg_ws_retract('+', c)
+ | listagg(c), listagg(c, '-'), listagg_retract(c), listagg_ws_retract(c, '+')
|FROM MyTable
|GROUP BY c
|""".stripMargin
@@ -1085,8 +1085,8 @@ class AggregateITCase(
val sink = new TestingRetractSink
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink)
env.execute()
- val expected = List("Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
- "Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
+ val expected = List("Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
+ "Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index 659f747..f2c29c6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -125,7 +125,7 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
// @Test
// def testSimpleLogical(): Unit = {
// val t = failingDataSource(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
-// .select('c.firstValue, 'c.lastValue, 'c.concat_agg("#"))
+// .select('c.firstValue, 'c.lastValue, 'c.LISTAGG("#"))
//
// val sink = new TestingRetractSink()
// t.toRetractStream[Row].addSink(sink)