You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 03:38:19 UTC

[flink] 01/03: [FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the behavior according to the ANSI-SQL

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca414aeb4e7495bbfa2751d241aafc85bc3fa9b6
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Aug 1 16:55:23 2019 +0800

    [FLINK-13529][table-planner-blink] Rename CONCAT_AGG to LISTAGG and fix the behavior according to the ANSI-SQL
    
    According to the ANSI-SQL, the LISTAGG function is used to transform values from a group of rows into a list of values that are delimited by a configurable separator.
    
    This closes #9316
---
 ...ConcatAggFunction.java => ListAggFunction.java} | 14 +++---
 ...ion.java => ListAggWithRetractAggFunction.java} | 30 ++++++------
 ...n.java => ListAggWsWithRetractAggFunction.java} | 30 ++++++------
 .../functions/sql/FlinkSqlOperatorTable.java       |  4 +-
 ...catAggFunction.java => SqlListAggFunction.java} | 27 +++++++----
 .../plan/rules/logical/SplitAggregateRule.scala    |  4 +-
 .../planner/plan/utils/AggFunctionFactory.scala    | 22 ++++-----
 .../table/planner/plan/utils/AggregateUtil.scala   |  4 +-
 ...java => ListAggWithRetractAggFunctionTest.java} | 16 +++----
 ...va => ListAggWsWithRetractAggFunctionTest.java} | 36 +++++++-------
 .../apache/flink/table/api/stream/ExplainTest.xml  | 48 +++++++++----------
 .../planner/plan/batch/sql/RemoveCollationTest.xml | 16 +++----
 .../plan/rules/logical/SplitAggregateRuleTest.xml  |  8 ++--
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 40 ++++++++--------
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 34 ++++++-------
 .../stream/sql/agg/IncrementalAggregateTest.xml    | 10 ++--
 .../flink/table/api/stream/ExplainTest.scala       |  4 +-
 .../plan/batch/sql/RemoveCollationTest.scala       |  4 +-
 .../rules/logical/SplitAggregateRuleTest.scala     |  2 +-
 .../stream/sql/MiniBatchIntervalInferTest.scala    |  6 +--
 .../stream/sql/agg/DistinctAggregateTest.scala     |  2 +-
 .../table/validation/AggregateValidationTest.scala | 55 ++++++++++++++++++----
 .../runtime/batch/sql/agg/SortAggITCase.scala      | 11 ++---
 .../runtime/stream/sql/AggregateITCase.scala       | 20 ++++----
 .../runtime/stream/table/AggregateITCase.scala     |  2 +-
 25 files changed, 245 insertions(+), 204 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
similarity index 91%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
index 2e8f5fb..377b251 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggFunction.java
@@ -32,23 +32,23 @@ import static org.apache.flink.table.planner.expressions.ExpressionBuilder.liter
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
 
 /**
- * built-in concat aggregate function.
+ * built-in listagg aggregate function.
  */
-public class ConcatAggFunction extends DeclarativeAggregateFunction {
+public class ListAggFunction extends DeclarativeAggregateFunction {
 	private int operandCount;
 	private UnresolvedReferenceExpression acc = unresolvedRef("concatAcc");
 	private UnresolvedReferenceExpression accDelimiter = unresolvedRef("accDelimiter");
 	private Expression delimiter;
 	private Expression operand;
 
-	public ConcatAggFunction(int operandCount) {
+	public ListAggFunction(int operandCount) {
 		this.operandCount = operandCount;
 		if (operandCount == 1) {
-			delimiter = literal("\n", DataTypes.STRING());
+			delimiter = literal(",", DataTypes.STRING());
 			operand = operand(0);
 		} else {
-			delimiter = operand(0);
-			operand = operand(1);
+			delimiter = operand(1);
+			operand = operand(0);
 		}
 	}
 
@@ -75,7 +75,7 @@ public class ConcatAggFunction extends DeclarativeAggregateFunction {
 	@Override
 	public Expression[] initialValuesExpressions() {
 		return new Expression[] {
-				/* delimiter */ literal("\n", DataTypes.STRING()),
+				/* delimiter */ literal(",", DataTypes.STRING()),
 				/* acc */ nullOf(DataTypes.STRING())
 		};
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
similarity index 77%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
index ea5857a..8840844 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunction.java
@@ -31,18 +31,18 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * built-in concat with retraction aggregate function.
+ * built-in listagg with retraction aggregate function.
  */
-public final class ConcatWithRetractAggFunction
-	extends AggregateFunction<BinaryString, ConcatWithRetractAggFunction.ConcatWithRetractAccumulator> {
+public final class ListAggWithRetractAggFunction
+	extends AggregateFunction<BinaryString, ListAggWithRetractAggFunction.ListAggWithRetractAccumulator> {
 
 	private static final long serialVersionUID = -2836795091288790955L;
-	private static final BinaryString lineDelimiter = BinaryString.fromString("\n");
+	private static final BinaryString lineDelimiter = BinaryString.fromString(",");
 
 	/**
-	 * The initial accumulator for concat with retraction aggregate function.
+	 * The initial accumulator for listagg with retraction aggregate function.
 	 */
-	public static class ConcatWithRetractAccumulator {
+	public static class ListAggWithRetractAccumulator {
 		public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE);
 		public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE);
 
@@ -55,25 +55,25 @@ public final class ConcatWithRetractAggFunction
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-			ConcatWithRetractAccumulator that = (ConcatWithRetractAccumulator) o;
+			ListAggWithRetractAccumulator that = (ListAggWithRetractAccumulator) o;
 			return Objects.equals(list, that.list) &&
 				Objects.equals(retractList, that.retractList);
 		}
 	}
 
 	@Override
-	public ConcatWithRetractAccumulator createAccumulator() {
-		return new ConcatWithRetractAccumulator();
+	public ListAggWithRetractAccumulator createAccumulator() {
+		return new ListAggWithRetractAccumulator();
 	}
 
-	public void accumulate(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception {
+	public void accumulate(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception {
 		// ignore null value
 		if (value != null) {
 			acc.list.add(value);
 		}
 	}
 
-	public void retract(ConcatWithRetractAccumulator acc, BinaryString value) throws Exception {
+	public void retract(ListAggWithRetractAccumulator acc, BinaryString value) throws Exception {
 		if (value != null) {
 			if (!acc.list.remove(value)) {
 				acc.retractList.add(value);
@@ -81,8 +81,8 @@ public final class ConcatWithRetractAggFunction
 		}
 	}
 
-	public void merge(ConcatWithRetractAccumulator acc, Iterable<ConcatWithRetractAccumulator> its) throws Exception {
-		for (ConcatWithRetractAccumulator otherAcc : its) {
+	public void merge(ListAggWithRetractAccumulator acc, Iterable<ListAggWithRetractAccumulator> its) throws Exception {
+		for (ListAggWithRetractAccumulator otherAcc : its) {
 			// merge list of acc and other
 			List<BinaryString> buffer = new ArrayList<>();
 			for (BinaryString binaryString : acc.list.get()) {
@@ -117,7 +117,7 @@ public final class ConcatWithRetractAggFunction
 	}
 
 	@Override
-	public BinaryString getValue(ConcatWithRetractAccumulator acc) {
+	public BinaryString getValue(ListAggWithRetractAccumulator acc) {
 		try {
 			Iterable<BinaryString> accList = acc.list.get();
 			if (accList == null || !accList.iterator().hasNext()) {
@@ -131,7 +131,7 @@ public final class ConcatWithRetractAggFunction
 		}
 	}
 
-	public void resetAccumulator(ConcatWithRetractAccumulator acc) {
+	public void resetAccumulator(ListAggWithRetractAccumulator acc) {
 		acc.list.clear();
 		acc.retractList.clear();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
similarity index 77%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
index a968cb4..317f97e 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunction.java
@@ -31,20 +31,20 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * built-in concatWs with retraction aggregate function.
+ * built-in listAggWs with retraction aggregate function.
  */
-public final class ConcatWsWithRetractAggFunction
-	extends AggregateFunction<BinaryString, ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator> {
+public final class ListAggWsWithRetractAggFunction
+	extends AggregateFunction<BinaryString, ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator> {
 
 	private static final long serialVersionUID = -8627988150350160473L;
 
 	/**
 	 * The initial accumulator for concat with retraction aggregate function.
 	 */
-	public static class ConcatWsWithRetractAccumulator {
+	public static class ListAggWsWithRetractAccumulator {
 		public ListView<BinaryString> list = new ListView<>(BinaryStringTypeInfo.INSTANCE);
 		public ListView<BinaryString> retractList = new ListView<>(BinaryStringTypeInfo.INSTANCE);
-		public BinaryString delimiter = BinaryString.fromString("\n");
+		public BinaryString delimiter = BinaryString.fromString(",");
 
 		@VisibleForTesting
 		@Override
@@ -55,7 +55,7 @@ public final class ConcatWsWithRetractAggFunction
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-			ConcatWsWithRetractAccumulator that = (ConcatWsWithRetractAccumulator) o;
+			ListAggWsWithRetractAccumulator that = (ListAggWsWithRetractAccumulator) o;
 			return Objects.equals(list, that.list) &&
 				Objects.equals(retractList, that.retractList) &&
 				Objects.equals(delimiter, that.delimiter);
@@ -63,11 +63,11 @@ public final class ConcatWsWithRetractAggFunction
 	}
 
 	@Override
-	public ConcatWsWithRetractAccumulator createAccumulator() {
-		return new ConcatWsWithRetractAccumulator();
+	public ListAggWsWithRetractAccumulator createAccumulator() {
+		return new ListAggWsWithRetractAccumulator();
 	}
 
-	public void accumulate(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception {
+	public void accumulate(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception {
 		// ignore null value
 		if (value != null) {
 			acc.delimiter = lineDelimiter;
@@ -75,7 +75,7 @@ public final class ConcatWsWithRetractAggFunction
 		}
 	}
 
-	public void retract(ConcatWsWithRetractAccumulator acc, BinaryString lineDelimiter, BinaryString value) throws Exception {
+	public void retract(ListAggWsWithRetractAccumulator acc, BinaryString value, BinaryString lineDelimiter) throws Exception {
 		if (value != null) {
 			acc.delimiter = lineDelimiter;
 			if (!acc.list.remove(value)) {
@@ -84,8 +84,8 @@ public final class ConcatWsWithRetractAggFunction
 		}
 	}
 
-	public void merge(ConcatWsWithRetractAccumulator acc, Iterable<ConcatWsWithRetractAccumulator> its) throws Exception {
-		for (ConcatWsWithRetractAccumulator otherAcc : its) {
+	public void merge(ListAggWsWithRetractAccumulator acc, Iterable<ListAggWsWithRetractAccumulator> its) throws Exception {
+		for (ListAggWsWithRetractAccumulator otherAcc : its) {
 			if (!otherAcc.list.get().iterator().hasNext()
 				&& !otherAcc.retractList.get().iterator().hasNext()) {
 				// otherAcc is empty, skip it
@@ -127,7 +127,7 @@ public final class ConcatWsWithRetractAggFunction
 	}
 
 	@Override
-	public BinaryString getValue(ConcatWsWithRetractAccumulator acc) {
+	public BinaryString getValue(ListAggWsWithRetractAccumulator acc) {
 		try {
 			Iterable<BinaryString> accList = acc.list.get();
 			if (accList == null || !accList.iterator().hasNext()) {
@@ -141,8 +141,8 @@ public final class ConcatWsWithRetractAggFunction
 		}
 	}
 
-	public void resetAccumulator(ConcatWsWithRetractAccumulator acc) {
-		acc.delimiter = BinaryString.fromString("\n");
+	public void resetAccumulator(ListAggWsWithRetractAccumulator acc) {
+		acc.delimiter = BinaryString.fromString(",");
 		acc.list.clear();
 		acc.retractList.clear();
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 50ff193..dbabb69 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -946,9 +946,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFirstLastValueAggFunction LAST_VALUE = new SqlFirstLastValueAggFunction(SqlKind.LAST_VALUE);
 
 	/**
-	 * <code>CONCAT_AGG</code> aggregate function.
+	 * <code>LISTAGG</code> aggregate function.
 	 */
-	public static final SqlConcatAggFunction CONCAT_AGG = new SqlConcatAggFunction();
+	public static final SqlListAggFunction LISTAGG = new SqlListAggFunction();
 
 	/**
 	 * <code>INCR_SUM</code> aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
similarity index 71%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
index 7e215e0..9557f0c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlConcatAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlListAggFunction.java
@@ -25,30 +25,37 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeTransforms;
 
 import java.util.List;
 
 /**
- * <code>CONCAT_AGG</code> aggregate function returns the concatenation of
+ * <code>LISTAGG</code> aggregate function returns the concatenation of
  * a list of values that are input to the function.
+ *
+ * <p>NOTE: The difference between this and {@link SqlStdOperatorTable#LISTAGG} is that:
+ * (1). constraint the second parameter must to be a character literal.
+ * (2). not require over clause to use this aggregate function.
  */
-public class SqlConcatAggFunction extends SqlAggFunction {
+public class SqlListAggFunction extends SqlAggFunction {
 
-	public SqlConcatAggFunction() {
-		super("CONCAT_AGG",
+	public SqlListAggFunction() {
+		super("LISTAGG",
 				null,
-				SqlKind.OTHER_FUNCTION,
-				ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+				SqlKind.LISTAGG,
+				ReturnTypes.ARG0_NULLABLE,
 				null,
 				OperandTypes.or(
 						OperandTypes.CHARACTER,
-						OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
-				SqlFunctionCategory.STRING,
+						OperandTypes.sequence(
+								"'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)'",
+								OperandTypes.CHARACTER,
+								OperandTypes.and(OperandTypes.CHARACTER, OperandTypes.LITERAL)
+						)),
+				SqlFunctionCategory.SYSTEM,
 				false,
 				false);
 	}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index b2bd3a5..524d5a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -334,8 +334,8 @@ object SplitAggregateRule {
       (Seq(FlinkSqlOperatorTable.FIRST_VALUE), Seq(FlinkSqlOperatorTable.FIRST_VALUE)),
     FlinkSqlOperatorTable.LAST_VALUE ->
       (Seq(FlinkSqlOperatorTable.LAST_VALUE), Seq(FlinkSqlOperatorTable.LAST_VALUE)),
-    FlinkSqlOperatorTable.CONCAT_AGG ->
-      (Seq(FlinkSqlOperatorTable.CONCAT_AGG), Seq(FlinkSqlOperatorTable.CONCAT_AGG)),
+    FlinkSqlOperatorTable.LISTAGG ->
+      (Seq(FlinkSqlOperatorTable.LISTAGG), Seq(FlinkSqlOperatorTable.LISTAGG)),
     SINGLE_VALUE -> (Seq(SINGLE_VALUE), Seq(SINGLE_VALUE))
   )
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 4dd3439..a8e00d3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu
 import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions._
-import org.apache.flink.table.planner.functions.sql.{SqlConcatAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
+import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
@@ -113,11 +113,11 @@ class AggFunctionFactory(
       case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.LAST_VALUE =>
         createLastValueAggFunction(argTypes, index)
 
-      case _: SqlConcatAggFunction if call.getArgList.size() == 1 =>
-        createConcatAggFunction(argTypes, index)
+      case _: SqlListAggFunction if call.getArgList.size() == 1 =>
+        createListAggFunction(argTypes, index)
 
-      case _: SqlConcatAggFunction if call.getArgList.size() == 2 =>
-        createConcatWsAggFunction(argTypes, index)
+      case _: SqlListAggFunction if call.getArgList.size() == 2 =>
+        createListAggWsFunction(argTypes, index)
 
       // TODO supports SqlCardinalityCountAggFunction
 
@@ -606,23 +606,23 @@ class AggFunctionFactory(
     }
   }
 
-  private def createConcatAggFunction(
+  private def createListAggFunction(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (needRetraction(index)) {
-      new ConcatWithRetractAggFunction
+      new ListAggWithRetractAggFunction
     } else {
-      new ConcatAggFunction(1)
+      new ListAggFunction(1)
     }
   }
 
-  private def createConcatWsAggFunction(
+  private def createListAggWsFunction(
       argTypes: Array[LogicalType],
       index: Int): UserDefinedFunction = {
     if (needRetraction(index)) {
-      new ConcatWsWithRetractAggFunction
+      new ListAggWsWithRetractAggFunction
     } else {
-      new ConcatAggFunction(2)
+      new ListAggFunction(2)
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 544555a..8877be5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.planner.dataview.DataViewUtils.useNullSerializerFo
 import org.apache.flink.table.planner.dataview.{DataViewSpec, MapViewSpec}
 import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart, RexNodeConverter}
 import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
-import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlConcatAggFunction, SqlFirstLastValueAggFunction}
+import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlListAggFunction, SqlFirstLastValueAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
@@ -541,7 +541,7 @@ object AggregateUtil extends Enumeration {
              _: SqlSumAggFunction |
              _: SqlSumEmptyIsZeroAggFunction |
              _: SqlSingleValueAggFunction |
-             _: SqlConcatAggFunction => true
+             _: SqlListAggFunction => true
         case _: SqlFirstLastValueAggFunction => aggCall.getArgList.size() == 1
         case _ => false
       }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
similarity index 79%
rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
index 725d802..5c5566e 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWithRetractAggFunctionTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWithRetractAggFunctionTest.java
@@ -20,17 +20,17 @@ package org.apache.flink.table.planner.functions.aggfunctions;
 
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.planner.functions.aggfunctions.ConcatWithRetractAggFunction.ConcatWithRetractAccumulator;
+import org.apache.flink.table.planner.functions.aggfunctions.ListAggWithRetractAggFunction.ListAggWithRetractAccumulator;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
 
 /**
- * Test case for built-in concat with retraction aggregate function.
+ * Test case for built-in LISTAGG with retraction aggregate function.
  */
-public class ConcatWithRetractAggFunctionTest
-	extends AggFunctionTestBase<BinaryString, ConcatWithRetractAccumulator> {
+public class ListAggWithRetractAggFunctionTest
+	extends AggFunctionTestBase<BinaryString, ListAggWithRetractAccumulator> {
 
 	@Override
 	protected List<List<BinaryString>> getInputValueSets() {
@@ -53,14 +53,14 @@ public class ConcatWithRetractAggFunctionTest
 	@Override
 	protected List<BinaryString> getExpectedResults() {
 		return Arrays.asList(
-				BinaryString.fromString("a\nb\nc\nd\ne\nf"),
+				BinaryString.fromString("a,b,c,d,e,f"),
 				null,
 				BinaryString.fromString("a"));
 	}
 
 	@Override
-	protected AggregateFunction<BinaryString, ConcatWithRetractAccumulator> getAggregator() {
-		return new ConcatWithRetractAggFunction();
+	protected AggregateFunction<BinaryString, ListAggWithRetractAccumulator> getAggregator() {
+		return new ListAggWithRetractAggFunction();
 	}
 
 	@Override
@@ -75,6 +75,6 @@ public class ConcatWithRetractAggFunctionTest
 
 	@Override
 	protected Class<?> getAccClass() {
-		return ConcatWithRetractAccumulator.class;
+		return ListAggWithRetractAccumulator.class;
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
similarity index 78%
rename from flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
rename to flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
index d9e06d0..67ee8ff 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ConcatWsWithRetractAggFunctionTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/aggfunctions/ListAggWsWithRetractAggFunctionTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.functions.aggfunctions;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.planner.functions.aggfunctions.ConcatWsWithRetractAggFunction.ConcatWsWithRetractAccumulator;
+import org.apache.flink.table.planner.functions.aggfunctions.ListAggWsWithRetractAggFunction.ListAggWsWithRetractAccumulator;
 import org.apache.flink.util.Preconditions;
 
 import java.lang.reflect.InvocationTargetException;
@@ -34,8 +34,8 @@ import static org.junit.Assert.assertEquals;
 /**
  * Test case for built-in concatWs with retraction aggregate function.
  */
-public class ConcatWsWithRetractAggFunctionTest
-	extends AggFunctionTestBase<BinaryString, ConcatWsWithRetractAccumulator> {
+public class ListAggWsWithRetractAggFunctionTest
+	extends AggFunctionTestBase<BinaryString, ListAggWsWithRetractAccumulator> {
 
 	@Override
 	protected List<List<BinaryString>> getInputValueSets() {
@@ -84,8 +84,8 @@ public class ConcatWsWithRetractAggFunctionTest
 	}
 
 	@Override
-	protected AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> getAggregator() {
-		return new ConcatWsWithRetractAggFunction();
+	protected AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> getAggregator() {
+		return new ListAggWsWithRetractAggFunction();
 	}
 
 	@Override
@@ -101,14 +101,14 @@ public class ConcatWsWithRetractAggFunctionTest
 
 	@Override
 	protected Class<?> getAccClass() {
-		return ConcatWsWithRetractAccumulator.class;
+		return ListAggWsWithRetractAccumulator.class;
 	}
 
 	@Override
 	protected <E> void validateResult(E expected, E result) {
-		if (expected instanceof ConcatWsWithRetractAccumulator && result instanceof ConcatWsWithRetractAccumulator) {
-			ConcatWsWithRetractAccumulator e = (ConcatWsWithRetractAccumulator) expected;
-			ConcatWsWithRetractAccumulator r = (ConcatWsWithRetractAccumulator) result;
+		if (expected instanceof ListAggWsWithRetractAccumulator && result instanceof ListAggWsWithRetractAccumulator) {
+			ListAggWsWithRetractAccumulator e = (ListAggWsWithRetractAccumulator) expected;
+			ListAggWsWithRetractAccumulator r = (ListAggWsWithRetractAccumulator) result;
 			assertEquals(e.list, r.list);
 			assertEquals(e.list, r.list);
 		} else {
@@ -117,31 +117,31 @@ public class ConcatWsWithRetractAggFunctionTest
 	}
 
 	@Override
-	protected ConcatWsWithRetractAccumulator accumulateValues(List<BinaryString> values)
+	protected ListAggWsWithRetractAccumulator accumulateValues(List<BinaryString> values)
 			throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-		AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator();
-		ConcatWsWithRetractAccumulator accumulator = getAggregator().createAccumulator();
+		AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator();
+		ListAggWsWithRetractAccumulator accumulator = getAggregator().createAccumulator();
 		Method accumulateFunc = getAccumulateFunc();
 		Preconditions.checkArgument(values.size() % 2 == 0,
 				"number of values must be an integer multiple of 2.");
 		for (int i = 0; i < values.size(); i += 2) {
-			BinaryString value = values.get(i);
-			BinaryString delimiter = values.get(i + 1);
+			BinaryString value = values.get(i + 1);
+			BinaryString delimiter = values.get(i);
 			accumulateFunc.invoke(aggregator, accumulator, delimiter, value);
 		}
 		return accumulator;
 	}
 
 	@Override
-	protected void retractValues(ConcatWsWithRetractAccumulator accumulator, List<BinaryString> values)
+	protected void retractValues(ListAggWsWithRetractAccumulator accumulator, List<BinaryString> values)
 			throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-		AggregateFunction<BinaryString, ConcatWsWithRetractAccumulator> aggregator = getAggregator();
+		AggregateFunction<BinaryString, ListAggWsWithRetractAccumulator> aggregator = getAggregator();
 		Method retractFunc = getRetractFunc();
 		Preconditions.checkArgument(values.size() % 2 == 0,
 				"number of values must be an integer multiple of 2.");
 		for (int i = 0; i < values.size(); i += 2) {
-			BinaryString value = values.get(i);
-			BinaryString delimiter = values.get(i + 1);
+			BinaryString value = values.get(i + 1);
+			BinaryString delimiter = values.get(i);
 			retractFunc.invoke(aggregator, accumulator, delimiter, value);
 		}
 	}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index fd84d11..16d3278 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -637,8 +637,8 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -649,8 +649,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -670,15 +670,15 @@ Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc]
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
 
 Sink(name=[appendSink1], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
    +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1], updateAsRetraction=[false], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
    +- Exchange(distribution=[hash[id1]], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
-      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
+      +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3], updateAsRetraction=[true], accMode=[Acc]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
          +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
@@ -713,11 +713,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+								content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 									ship_strategy : HASH
 
 									 : Operator
@@ -725,11 +725,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+											content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
 											ship_strategy : FORWARD
 
 											 : Operator
-												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 												ship_strategy : HASH
 
 												 : Operator
@@ -784,8 +784,8 @@ Union(all=[true], union=[a, b, c])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], $f2=[_UTF-16LE'#'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[TUMBLE($1, 8000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'#'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -796,8 +796,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], $f2=[_UTF-16LE'*'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject(id1=[$0], $f1=[HOP($1, 12000:INTERVAL SECOND, 6000:INTERVAL SECOND)], text=[$2], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$0], ts=[$2], text=[$1])
             +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -817,15 +817,15 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, cnt, name, goods, rowtime])
 
 Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[id1, ts, _UTF-16LE'#' AS $f2, text])
+      +- Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[id1, ts, _UTF-16LE'*' AS $f2, text])
+      +- Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
          +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
@@ -860,11 +860,11 @@ Sink(name=[appendSink2], fields=[a, b])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (id1, ts, _UTF-16LE'#' AS $f2, text))
+								content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 									ship_strategy : HASH
 
 									 : Operator
@@ -872,11 +872,11 @@ Sink(name=[appendSink2], fields=[a, b])
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : Calc(select: (id1, ts, _UTF-16LE'*' AS $f2, text))
+											content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
 											ship_strategy : FORWARD
 
 											 : Operator
-												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 												ship_strategy : HASH
 
 												 : Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
index fd8003b..76b6419 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
@@ -310,7 +310,7 @@ v2 as (
 ),
 
 join_tb2 as (
- select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+ select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
  from (
   select v1.id as tb1_id, tb2.name as tb2_name
    from v1 left outer join tb2 on tb2_id = tb2.id
@@ -318,7 +318,7 @@ join_tb2 as (
 ),
 
 join_tb3 as (
- select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+ select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
  from (
   select v2.id as tb1_id, tb3.name as tb3_name
    from v2 left outer join tb3 on tb3_id = tb3.id
@@ -349,7 +349,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t
    :  :- LogicalJoin(condition=[=($0, $7)], joinType=[left])
    :  :  :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
    :  :  :  :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]])
-   :  :  :  +- LogicalAggregate(group=[{0}], tb2_names=[CONCAT_AGG($1, $2)])
+   :  :  :  +- LogicalAggregate(group=[{0}], tb2_names=[LISTAGG($1, $2)])
    :  :  :     +- LogicalProject(tb1_id=[$0], tb2_name=[$3], $f2=[_UTF-16LE','])
    :  :  :        +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
    :  :  :           :- LogicalProject(id=[$0], tb2_id=[$5])
@@ -357,7 +357,7 @@ LogicalProject(id=[$0], tb2_ids=[$2], tb3_ids=[$3], name=[$4], tb2_names=[$6], t
    :  :  :           :     :- LogicalTableScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]])
    :  :  :           :     +- LogicalTableFunctionScan(invocation=[split($cor0.tb2_ids)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;])
    :  :  :           +- LogicalTableScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]])
-   :  :  +- LogicalAggregate(group=[{0}], tb3_names=[CONCAT_AGG($1, $2)])
+   :  :  +- LogicalAggregate(group=[{0}], tb3_names=[LISTAGG($1, $2)])
    :  :     +- LogicalProject(tb1_id=[$0], tb3_name=[$3], $f2=[_UTF-16LE','])
    :  :        +- LogicalJoin(condition=[=($1, $2)], joinType=[left])
    :  :           :- LogicalProject(id=[$0], tb3_id=[$5])
@@ -382,10 +382,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1])
    :     :        :  +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(id, tb1_id)], select=[id, key, tb2_ids, tb3_ids, name, tb1_id, tb2_names], rightSorted=[true])
    :     :        :     :- Exchange(distribution=[hash[id]])
    :     :        :     :  +- TableSourceScan(table=[[default_catalog, default_database, tb1, source: [TestTableSource(id, key, tb2_ids, tb3_ids, name)]]], fields=[id, key, tb2_ids, tb3_ids, name], reuse_id=[1])
-   :     :        :     +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb2_names])
+   :     :        :     +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb2_names])
    :     :        :        +- Sort(orderBy=[tb1_id ASC])
    :     :        :           +- Exchange(distribution=[hash[tb1_id]])
-   :     :        :              +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)])
+   :     :        :              +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb2_name, $f2) AS (accDelimiter$0, concatAcc$1)])
    :     :        :                 +- Sort(orderBy=[tb1_id ASC])
    :     :        :                    +- Calc(select=[id AS tb1_id, name AS tb2_name, _UTF-16LE',' AS $f2])
    :     :        :                       +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb2_id, id0)], select=[id, tb2_id, id0, name])
@@ -395,10 +395,10 @@ Calc(select=[id, tb2_ids, tb3_ids, name, tb2_names, tb3_names, name0, name1])
    :     :        :                          :        +- Reused(reference_id=[1])
    :     :        :                          +- Exchange(distribution=[hash[id]])
    :     :        :                             +- TableSourceScan(table=[[default_catalog, default_database, tb2, source: [TestTableSource(id, name)]]], fields=[id, name])
-   :     :        +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_CONCAT_AGG(accDelimiter$0, concatAcc$1) AS tb3_names])
+   :     :        +- SortAggregate(isMerge=[true], groupBy=[tb1_id], select=[tb1_id, Final_LISTAGG(accDelimiter$0, concatAcc$1) AS tb3_names])
    :     :           +- Sort(orderBy=[tb1_id ASC])
    :     :              +- Exchange(distribution=[hash[tb1_id]])
-   :     :                 +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_CONCAT_AGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)])
+   :     :                 +- LocalSortAggregate(groupBy=[tb1_id], select=[tb1_id, Partial_LISTAGG(tb3_name, $f2) AS (accDelimiter$0, concatAcc$1)])
    :     :                    +- Sort(orderBy=[tb1_id ASC])
    :     :                       +- Calc(select=[id AS tb1_id, name AS tb3_name, _UTF-16LE',' AS $f2])
    :     :                          +- SortMergeJoin(joinType=[LeftOuterJoin], where=[=(tb3_id, id0)], select=[id, tb3_id, id0, name])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
index a57c307..b59a9c8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -171,19 +171,19 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[SUM(
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalAggregate(group=[{0}], agg#0=[CONCAT_AGG($2)], agg#1=[$SUM0($3)])
-+- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[CONCAT_AGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
+FlinkLogicalAggregate(group=[{0}], agg#0=[LISTAGG($2)], agg#1=[$SUM0($3)])
++- FlinkLogicalAggregate(group=[{0, 3}], agg#0=[LISTAGG($2) FILTER $4], agg#1=[COUNT(DISTINCT $1) FILTER $5])
    +- FlinkLogicalCalc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
       +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
          +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 19d3208..9a1d03b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -220,11 +220,11 @@ Calc(select=[a, b])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(name=[appendSink1], fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'*'], text=[$1])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
          +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-            +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
-               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+            +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                   +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
                      +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                         +- LogicalJoin(condition=[true], joinType=[inner])
@@ -235,8 +235,8 @@ LogicalSink(name=[appendSink1], fields=[a, b])
 
 LogicalSink(name=[appendSink2], fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[CONCAT_AGG($2, $3)])
-      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'-'], text=[$2])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-'])
          +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
             +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
@@ -249,8 +249,8 @@ LogicalSink(name=[appendSink3], fields=[a, b])
 +- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
    +- LogicalProject(id1=[$0], text=[$1])
       +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-         +- LogicalAggregate(group=[{0, 1}], text=[CONCAT_AGG($2, $3)])
-            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], $f2=[_UTF-16LE'#'], text=[$2])
+         +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
                   +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                      +- LogicalJoin(condition=[true], joinType=[inner])
@@ -269,21 +269,21 @@ Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
       +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
          +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
 
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
+GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
 +- Exchange(distribution=[hash[id1]])
-   +- Calc(select=[ts, id1, _UTF-16LE'#' AS $f2, text])
+   +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
       +- Reused(reference_id=[1])
 
 Sink(name=[appendSink1], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text])
+      +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
          +- Reused(reference_id=[2])
 
 Sink(name=[appendSink2], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, CONCAT_AGG($f2, text) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[ts, id1, _UTF-16LE'-' AS $f2, text])
+      +- Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
          +- Reused(reference_id=[1])
 
 Sink(name=[appendSink3], fields=[a, b])
@@ -325,19 +325,19 @@ Sink(name=[appendSink3], fields=[a, b])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (ts, id1, _UTF-16LE'#' AS $f2, text))
+								content : Calc(select: (ts, id1, text, _UTF-16LE'#' AS $f3))
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
+									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
 									ship_strategy : HASH
 
 									 : Operator
-										content : Calc(select: (w$rowtime AS ts, id1, _UTF-16LE'*' AS $f2, text))
+										content : Calc(select: (w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3))
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+											content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 											ship_strategy : HASH
 
 											 : Operator
@@ -345,11 +345,11 @@ Sink(name=[appendSink3], fields=[a, b])
 												ship_strategy : FORWARD
 
 												 : Operator
-													content : Calc(select: (ts, id1, _UTF-16LE'-' AS $f2, text))
+													content : Calc(select: (ts, id1, text, _UTF-16LE'-' AS $f3))
 													ship_strategy : FORWARD
 
 													 : Operator
-														content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, CONCAT_AGG($f2, text) AS EXPR$1)
+														content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
 														ship_strategy : HASH
 
 														 : Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index e9ec761..dc74eb4 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -677,18 +677,18 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
+GroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS EXPR$1, COUNT(DISTINCT b) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
    +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -697,20 +697,20 @@ GroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS EXPR$1, COUNT(DISTINCT b
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=false, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
+GlobalGroupAggregate(groupBy=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS EXPR$1, COUNT(distinct$0 count$2) AS EXPR$2])
 +- Exchange(distribution=[hash[a]])
-   +- LocalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
+   +- LocalGroupAggregate(groupBy=[a], select=[a, LISTAGG(c) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) AS count$2, DISTINCT(b) AS distinct$0])
       +- WatermarkAssigner(fields=[a, b, c], miniBatchInterval=[Proctime, 1000ms])
          +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
@@ -718,20 +718,20 @@ GlobalGroupAggregate(groupBy=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatA
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
+GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS $f1, $SUM0_RETRACT($f3_0) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
+   +- GroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f3_0])
       +- Exchange(distribution=[hash[a, $f3]])
          +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
             +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
@@ -743,23 +743,23 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETR
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT(concat_agg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT(listagg$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG_RETRACT($f2) AS concat_agg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
-      +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
+   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f2) AS listagg$0, $SUM0_RETRACT($f3_0) AS sum$1, COUNT_RETRACT(*) AS count1$2])
+      +- GlobalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f2, COUNT(distinct$0 count$2) AS $f3_0])
          +- Exchange(distribution=[hash[a, $f3]])
-            +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+            +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
                +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                   +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                      +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index a415121..ccef832 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -196,22 +196,22 @@ GlobalGroupAggregate(groupBy=[c], partialFinalType=[FINAL], select=[c, MIN_RETRA
   </TestCase>
   <TestCase name="testSingleConcatAggWithDistinctAgg[splitDistinctAggEnabled=true, aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[CONCAT_AGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
+LogicalAggregate(group=[{0}], EXPR$1=[LISTAGG($1)], EXPR$2=[COUNT(DISTINCT $2)])
 +- LogicalProject(a=[$0], c=[$2], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS $f1, $SUM0(count$2) AS $f2])
 +- Exchange(distribution=[hash[a]])
-   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, CONCAT_AGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
+   +- IncrementalGroupAggregate(partialAggGrouping=[a, $f3], finalAggGrouping=[a], select=[a, LISTAGG((accDelimiter$0, concatAcc$1)) AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 count$2) AS count$2])
       +- Exchange(distribution=[hash[a, $f3]])
-         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, CONCAT_AGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
+         +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, LISTAGG(c) FILTER $g_1 AS (accDelimiter$0, concatAcc$1), COUNT(distinct$0 b) FILTER $g_0 AS count$2, DISTINCT(b) AS distinct$0])
             +- Calc(select=[a, b, c, $f3, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                +- Expand(projects=[{a=[$0], b=[$1], c=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], c=[$2], $f3=[null], $e=[1]}])
                   +- Calc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index 9952e34..198fb0f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -131,7 +131,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
     val table1 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, CONCAT_AGG('#', text)
+        |SELECT id1, LISTAGG(text, '#')
         |FROM TempTable
         |GROUP BY id1, TUMBLE(ts, INTERVAL '8' SECOND)
       """.stripMargin)
@@ -140,7 +140,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase {
 
     val table2 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, CONCAT_AGG('*', text)
+        |SELECT id1, LISTAGG(text, '*')
         |FROM TempTable
         |GROUP BY id1, HOP(ts, INTERVAL '12' SECOND, INTERVAL '6' SECOND)
       """.stripMargin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
index 73c202d..5a5bdf5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.scala
@@ -307,7 +307,7 @@ class RemoveCollationTest extends TableTestBase {
         |),
         |
         |join_tb2 as (
-        | select tb1_id, concat_agg(tb2_name, ',') as tb2_names
+        | select tb1_id, LISTAGG(tb2_name, ',') as tb2_names
         | from (
         |  select v1.id as tb1_id, tb2.name as tb2_name
         |   from v1 left outer join tb2 on tb2_id = tb2.id
@@ -315,7 +315,7 @@ class RemoveCollationTest extends TableTestBase {
         |),
         |
         |join_tb3 as (
-        | select tb1_id, concat_agg(tb3_name, ',') as tb3_names
+        | select tb1_id, LISTAGG(tb3_name, ',') as tb3_names
         | from (
         |  select v2.id as tb1_id, tb3.name as tb3_name
         |   from v2 left outer join tb3 on tb3_id = tb3.id
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index b91649b..5d58dfb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -95,7 +95,7 @@ class SplitAggregateRuleTest extends TableTestBase {
 
   @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
-    util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
+    util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index ce9e042..9de3886 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -297,7 +297,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     val table2 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('#', text) as text,
+        |    LISTAGG(text, '#') as text,
         |    TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
@@ -307,7 +307,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   val table3 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('*', text)
+        |    LISTAGG(text, '*')
         |FROM TempTable2
         |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1
       """.stripMargin)
@@ -317,7 +317,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     val table4 = util.tableEnv.sqlQuery(
       """
         |SELECT id1,
-        |    CONCAT_AGG('-', text)
+        |    LISTAGG(text, '-')
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1
       """.stripMargin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 7a30103..8e36305 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -103,7 +103,7 @@ class DistinctAggregateTest(
 
   @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
-    util.verifyPlan("SELECT a, CONCAT_AGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
+    util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
index 5ce79fc..99ed303 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.scala
@@ -22,14 +22,16 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.utils.{TableFunc0, TableTestBase}
+import org.apache.flink.types.Row
 
+import org.junit.Assert.{assertTrue, fail}
 import org.junit.Test
 
 class AggregateValidationTest extends TableTestBase {
+  private val util = scalaStreamTestUtil()
 
   @Test(expected = classOf[ValidationException])
   def testGroupingOnNonExistentField(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     val ds = table
@@ -40,7 +42,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testGroupingInvalidSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -51,7 +52,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidAggregationInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -63,7 +63,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWindowPropertiesInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -75,7 +74,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[RuntimeException])
   def testTableFunctionInSelection(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     util.addFunction("func", new TableFunc0)
@@ -90,7 +88,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidScalarFunctionInAggregate(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     table
@@ -102,7 +99,6 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidTableFunctionInAggregate(): Unit = {
-    val util = streamTestUtil()
     val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
 
     util.addFunction("func", new TableFunc0)
@@ -115,13 +111,52 @@ class AggregateValidationTest extends TableTestBase {
 
   @Test(expected = classOf[RuntimeException])
   def testMultipleAggregateExpressionInAggregate(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
-
     util.addFunction("func", new TableFunc0)
+    val table = util.addTableSource[(Long, Int, String)]('a, 'b, 'c)
     table
       .groupBy('a)
       // must fail. Only one AggregateFunction can be used in aggregate
       .aggregate("sum(c), count(b)")
   }
+
+  @Test
+  def testIllegalArgumentForListAgg(): Unit = {
+    util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+    // If there are two parameters, second one must be character literal.
+    expectExceptionThrown(
+      "SELECT listagg(c, d) FROM T GROUP BY a",
+    "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)",
+      classOf[ValidationException])
+  }
+
+  @Test
+  def testIllegalArgumentForListAgg1(): Unit = {
+    util.addTableSource[(Long, Int, String, String)]("T", 'a, 'b, 'c, 'd)
+    // If there are two parameters, second one must be character literal.
+    expectExceptionThrown(
+      "SELECT LISTAGG(c, 1) FROM T GROUP BY a",
+      "Supported form(s): 'LISTAGG(<CHARACTER>)'\n'LISTAGG(<CHARACTER>, <CHARACTER_LITERAL>)",
+      classOf[ValidationException])
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def expectExceptionThrown(
+      sql: String,
+      keywords: String,
+      clazz: Class[_ <: Throwable] = classOf[ValidationException])
+  : Unit = {
+    try {
+      util.tableEnv.toAppendStream[Row](util.tableEnv.sqlQuery(sql))
+      fail(s"Expected a $clazz, but no exception is thrown.")
+    } catch {
+      case e if e.getClass == clazz =>
+        if (keywords != null) {
+          assertTrue(
+            s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
+            e.getMessage.contains(keywords))
+        }
+      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index f64b0b6..eed0d3e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -138,27 +138,26 @@ class SortAggITCase
   }
 
   // NOTE: Spark has agg functions collect_list(), collect_set().
-  //       instead, we'll test concat_agg() here
-  @Ignore
+  //       instead, we'll test LISTAGG() here
   @Test
   def testConcatAgg(): Unit = {
     checkResult(
-      "SELECT concat_agg('-', c), concat_agg(c) FROM SmallTable3",
+      "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM SmallTable3",
       Seq(
-        row("Hi-Hello-Hello world", "Hi\nHello\nHello world")
+        row("Hi-Hello-Hello world", "Hi,Hello,Hello world")
       )
     )
 
     // EmptyTable5
     checkResult(
-      "SELECT concat_agg('-', g), concat_agg(g) FROM EmptyTable5",
+      "SELECT LISTAGG(g, '-'), LISTAGG(g) FROM EmptyTable5",
       Seq(
         row(null, null)
       )
     )
 
     checkResult(
-      "SELECT concat_agg('-', c), concat_agg(c) FROM AllNullTable3",
+      "SELECT LISTAGG(c, '-'), LISTAGG(c) FROM AllNullTable3",
       Seq(
         row(null, null)
       )
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 318487fe..41561a3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.functions.aggfunctions.{ConcatWithRetractAggFunction, ConcatWsWithRetractAggFunction}
+import org.apache.flink.table.planner.functions.aggfunctions.{ListAggWithRetractAggFunction, ListAggWsWithRetractAggFunction}
 import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSumAggFunction
 import org.apache.flink.table.planner.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsAggFunction}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.AggMode
@@ -605,7 +605,7 @@ class AggregateITCase(
 
     val sqlQuery =
       s"""
-         |SELECT len, concat_agg('#', content) FROM T GROUP BY len
+         |SELECT len, listagg(content, '#') FROM T GROUP BY len
        """.stripMargin
 
     val sink = new TestingRetractSink
@@ -629,7 +629,7 @@ class AggregateITCase(
 
     val sqlQuery =
       s"""
-         |SELECT len, concat_agg(content) FROM T GROUP BY len
+         |SELECT len, listagg(content) FROM T GROUP BY len
        """.stripMargin
 
     val sink = new TestingRetractSink
@@ -906,7 +906,7 @@ class AggregateITCase(
       """
         |SELECT b, min(c), max(c)
         |FROM (
-        | SELECT a, b, concat_agg(c) as c
+        | SELECT a, b, listagg(c) as c
         | FROM T
         | GROUP BY a, b)
         |GROUP BY b
@@ -1061,15 +1061,15 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  /** Test CONCAT_AGG **/
+  /** Test LISTAGG **/
   @Test
   def testConcatAgg(): Unit = {
-    tEnv.registerFunction("concat_agg_retract", new ConcatWithRetractAggFunction)
-    tEnv.registerFunction("concat_agg_ws_retract", new ConcatWsWithRetractAggFunction)
+    tEnv.registerFunction("listagg_retract", new ListAggWithRetractAggFunction)
+    tEnv.registerFunction("listagg_ws_retract", new ListAggWsWithRetractAggFunction)
     val sqlQuery =
       s"""
          |SELECT
-         |  concat_agg(c), concat_agg('-', c), concat_agg_retract(c), concat_agg_ws_retract('+', c)
+         |  listagg(c), listagg(c, '-'), listagg_retract(c), listagg_ws_retract(c, '+')
          |FROM MyTable
          |GROUP BY c
          |""".stripMargin
@@ -1085,8 +1085,8 @@ class AggregateITCase(
     val sink = new TestingRetractSink
     tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink)
     env.execute()
-    val expected = List("Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
-      "Hi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi\nHi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
+    val expected = List("Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi-Hi," +
+      "Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi,Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi+Hi")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
index 659f747..f2c29c6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala
@@ -125,7 +125,7 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
 //  @Test
 //  def testSimpleLogical(): Unit = {
 //    val t = failingDataSource(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
-//      .select('c.firstValue, 'c.lastValue, 'c.concat_agg("#"))
+//      .select('c.firstValue, 'c.lastValue, 'c.LISTAGG("#"))
 //
 //    val sink = new TestingRetractSink()
 //    t.toRetractStream[Row].addSink(sink)